Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

PD-257019 Producer send and logging changes #838

Merged
merged 4 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
String queueType = determineQueueType();
for (Map.Entry<String, Collection<String>> topicEntry : eventsByChannel.asMap().entrySet()) {
String queueName= topicEntry.getKey();
String topic = "dsq-" + (("dedup".equals(queueType)) ? "dedup-" + queueName : queueName);
String topic = "dsq-" + (("dedupq".equals(queueType)) ? "dedup-" + queueName : queueName);
// Check if the topic exists, if not create it and execute Step Function
if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) {
Map<String, String> parameters = fetchStepFunctionParameters();
Expand Down Expand Up @@ -416,8 +416,8 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q
// Create the timestamp
String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds

// Check if queueType is "dedup" and prepend "D" to execution name if true
String executionName = (queueType.equalsIgnoreCase("dedup") ? "D_" : "") + queueName + "_" + timestamp;
// Check if queueType is "dedupq" and prepend "D" to execution name if true
String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;

// Start the Step Function execution
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
Expand All @@ -434,7 +434,7 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q
*/
private String determineQueueType() {
if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) {
return "dedup";
return "dedupq";
}
return "queue";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public Boolean createTopicIfNotExists(String topic, int numPartitions, short rep
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
_log.info("Created topic: {} with numPartitions: {} and replication factor {} ", topic, numPartitions, replicationFactor);
} catch (Exception e) {
_log.error("Error creating topic {}: {}", topic, e.getMessage());
_log.error("Error creating topic {}: ", topic, e);
throw new RuntimeException(e);
}
}
Expand All @@ -46,15 +46,18 @@ public Boolean createTopicIfNotExists(String topic, int numPartitions, short rep
public boolean isTopicExists(String topic) {
try {
return adminClient.listTopics().names().get().contains(topic);
} catch (Exception e) {
throw new RuntimeException(e);
} catch (Exception e) {
_log.error("Failed to check if topic exists: {}", topic, e);
throw new RuntimeException("Error checking if topic exists", e);
}
}

/**
* Closes the AdminClient to release resources.
*/
public void close() {
adminClient.close();
if (adminClient != null) {
adminClient.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class KafkaConfig {
private static String bootstrapServersConfig;
Expand All @@ -33,41 +34,36 @@ public class KafkaConfig {
.standard()
.build();

private static final String DEFAULT_BOOTSTRAP_SERVERS =
"b-1.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092," +
"b-2.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092";


static {
try {
final String UNIVERSE = getUniverseFromEnv();
// Load configurations from SSM during static initialization
String basePath = "/" + UNIVERSE + "/emodb/kafka/";
// Load configurations from SSM during static initialization
Map<String, String> parameterValues = getParameterValues(
Arrays.asList(
"/" + UNIVERSE + "/emodb/kafka/batchSize",
"/" + UNIVERSE + "/emodb/kafka/retries",
"/" + UNIVERSE + "/emodb/kafka/lingerMs",
"/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
)
Stream.of("batchSize", "retries", "lingerMs", "bootstrapServers")
.map(param -> basePath + param)
.collect(Collectors.toList())
);

// Set configurations with fallback to defaults if not present
// Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
batchSizeConfig = parameterValues.getOrDefault(basePath+ "batchSize", "16384");

// Sets the number of retry attempts for failed Kafka message sends.
retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
retriesConfig = parameterValues.getOrDefault(basePath+ "retries", "3");

// Sets the number of milliseconds a producer is willing to wait before sending a batch out
lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
lingerMsConfig = parameterValues.getOrDefault(basePath+"lingerMs", "1");

// Configures the Kafka broker addresses for producer connections.
bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS);
bootstrapServersConfig = parameterValues.get(basePath+"bootstrapServers");

logger.info("Kafka configurations loaded successfully from SSM.");
} catch (AmazonServiceException e) {
logger.error("Failed to load configurations from SSM. Using default values.", e);
throw e;
// Log the kafka configurations loaded from SSM
logger.info("Kafka configurations loaded from SSM: batchSize={}, retries={}, lingerMs={}, bootstrapServers={}",
batchSizeConfig, retriesConfig, lingerMsConfig,
bootstrapServersConfig != null ? bootstrapServersConfig : "Not configured (null)");
}
catch (Exception e) {
logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,7 @@ public void sendMessages(String topic, Collection<String> events, String queueTy
public void sendMessage(String topic, String message, String queueType) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message, message);
try {
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> {
if (exception != null) {
_log.error("Failed to send message to topic '{}'. Error: {}", topic, exception.getMessage());
}
});
// Optionally, you can wait for the send to complete
RecordMetadata metadata = future.get(); // Blocking call
RecordMetadata metadata = producer.send(record).get(); // Blocking call
} catch (Exception e) {
_log.error("Failed to send message to topic '{}'. Exception: {}", topic, e.getMessage());
throw new RuntimeException("Error sending message to kafka"+e.getMessage());
Expand All @@ -59,8 +53,12 @@ public void sendMessage(String topic, String message, String queueType) {
*/
public void close() {
_log.info("Closing Kafka producer.");
producer.flush();
producer.close();
_log.info("Kafka producer closed.");
try {
producer.flush();
producer.close();
} catch (Exception e) {
_log.error("Error while closing Kafka producer: ", e);
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,18 @@ public long getMessageCount(@QueryParam("partitioned") BooleanParam partitioned,
}
}

@GET
@Path("{queue}/uncached_size")
@RequiresPermissions("queue|get_status|{queue}")
@Timed(name = "bv.emodb.dedupq.DedupQueueResource1.getUncachedMessageCount", absolute = true)
@ApiOperation (value = "gets the uncached Message count.",
notes = "Returns a long.",
response = long.class
)
public long getUncachedMessageCount(@PathParam("queue") String queue) {
return _queueService.getUncachedSize(queue);
}


@GET
@Path("{queue}/claimcount")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection<Ob
@Path("{queue}/sendbatch1")
@Consumes(MediaType.APPLICATION_JSON)
@RequiresPermissions("queue|post|{queue}")
@Timed(name = "bv.emodb.queue.QueueResource1.sendBatch", absolute = true)
@Timed(name = "bv.emodb.queue.QueueResource1.sendBatch1", absolute = true)
@ApiOperation (value = "Send a Batch.",
notes = "Returns a SuccessResponse..",
response = SuccessResponse.class
Expand Down