Skip to content
This repository was archived by the owner on Feb 10, 2025. It is now read-only.

Commit 63deae2

Browse files
PD-257019 Producer send and logging changes (#838)
* fix: fix send logic and exception handling changes * added exception handling * changed producer send logic to be synchronous * removed redundant logs * uncached_size for dedupq (#837) * chore: rename queuetype as per convention * chore: fix naming issue --------- Co-authored-by: nabajyotiDash-hub <[email protected]>
1 parent a90e902 commit 63deae2

File tree

3 files changed

+19
-23
lines changed

3 files changed

+19
-23
lines changed

queue/src/main/java/com/bazaarvoice/emodb/queue/core/AbstractQueueService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ public void sendAll(Map<String, ? extends Collection<?>> messagesByQueue) {
240240
String queueType = determineQueueType();
241241
for (Map.Entry<String, Collection<String>> topicEntry : eventsByChannel.asMap().entrySet()) {
242242
String queueName= topicEntry.getKey();
243-
String topic = "dsq-" + (("dedup".equals(queueType)) ? "dedup-" + queueName : queueName);
243+
String topic = "dsq-" + (("dedupq".equals(queueType)) ? "dedup-" + queueName : queueName);
244244
// Check if the topic exists, if not create it and execute Step Function
245245
if (!adminService.createTopicIfNotExists(topic, TOPIC_PARTITION_COUNT, TOPIC_REPLICATION_FACTOR, queueType)) {
246246
Map<String, String> parameters = fetchStepFunctionParameters();
@@ -456,8 +456,8 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q
456456
// Create the timestamp
457457
String timestamp = String.valueOf(System.currentTimeMillis()); // Current time in milliseconds
458458

459-
// Check if queueType is "dedup" and prepend "D" to execution name if true
460-
String executionName = (queueType.equalsIgnoreCase("dedup") ? "D_" : "") + queueName + "_" + timestamp;
459+
// Check if queueType is "dedupq" and prepend "D" to execution name if true
460+
String executionName = (queueType.equalsIgnoreCase("dedupq") ? "D_" : "") + queueName + "_" + timestamp;
461461

462462
// Start the Step Function execution
463463
stepFunctionService.startExecution(stateMachineArn, inputPayload, executionName);
@@ -474,7 +474,7 @@ private void startStepFunctionExecution(Map<String, String> parameters, String q
474474
*/
475475
private String determineQueueType() {
476476
if (_eventStore.getClass().getName().equals("com.bazaarvoice.emodb.event.dedup.DefaultDedupEventStore")) {
477-
return "dedup";
477+
return "dedupq";
478478
}
479479
return "queue";
480480
}

queue/src/main/java/com/bazaarvoice/emodb/queue/core/kafka/KafkaConfig.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Map;
2222
import java.util.Properties;
2323
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
2425

2526
public class KafkaConfig {
2627
private static String bootstrapServersConfig;
@@ -33,41 +34,36 @@ public class KafkaConfig {
3334
.standard()
3435
.build();
3536

36-
private static final String DEFAULT_BOOTSTRAP_SERVERS =
37-
"b-1.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092," +
38-
"b-2.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092";
39-
4037

4138
static {
4239
try {
4340
final String UNIVERSE = getUniverseFromEnv();
4441
// Load configurations from SSM during static initialization
42+
String basePath = "/" + UNIVERSE + "/emodb/kafka/";
43+
// Load configurations from SSM during static initialization
4544
Map<String, String> parameterValues = getParameterValues(
46-
Arrays.asList(
47-
"/" + UNIVERSE + "/emodb/kafka/batchSize",
48-
"/" + UNIVERSE + "/emodb/kafka/retries",
49-
"/" + UNIVERSE + "/emodb/kafka/lingerMs",
50-
"/" + UNIVERSE + "/emodb/kafka/bootstrapServers"
51-
)
45+
Stream.of("batchSize", "retries", "lingerMs", "bootstrapServers")
46+
.map(param -> basePath + param)
47+
.collect(Collectors.toList())
5248
);
5349

5450
// Set configurations with fallback to defaults if not present
5551
// Sets the batch size for Kafka producer, which controls the amount of data to batch before sending.
56-
batchSizeConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/batchSize", "16384");
52+
batchSizeConfig = parameterValues.getOrDefault(basePath+ "batchSize", "16384");
5753

5854
// Sets the number of retry attempts for failed Kafka message sends.
59-
retriesConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/retries", "3");
55+
retriesConfig = parameterValues.getOrDefault(basePath+ "retries", "3");
6056

6157
// Sets the number of milliseconds a producer is willing to wait before sending a batch out
62-
lingerMsConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/lingerMs", "1");
58+
lingerMsConfig = parameterValues.getOrDefault(basePath+"lingerMs", "1");
6359

6460
// Configures the Kafka broker addresses for producer connections.
65-
bootstrapServersConfig = parameterValues.getOrDefault("/" + UNIVERSE + "/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS);
61+
bootstrapServersConfig = parameterValues.get(basePath+"bootstrapServers");
6662

67-
logger.info("Kafka configurations loaded successfully from SSM.");
68-
} catch (AmazonServiceException e) {
69-
logger.error("Failed to load configurations from SSM. Using default values.", e);
70-
throw e;
63+
// Log the kafka configurations loaded from SSM
64+
logger.info("Kafka configurations loaded from SSM: batchSize={}, retries={}, lingerMs={}, bootstrapServers={}",
65+
batchSizeConfig, retriesConfig, lingerMsConfig,
66+
bootstrapServersConfig != null ? bootstrapServersConfig : "Not configured (null)");
7167
}
7268
catch (Exception e) {
7369
logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e);

web/src/main/java/com/bazaarvoice/emodb/web/resources/queue/QueueResource1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ public SuccessResponse sendBatch(@PathParam("queue") String queue, Collection<Ob
127127
@Path("{queue}/sendbatch1")
128128
@Consumes(MediaType.APPLICATION_JSON)
129129
@RequiresPermissions("queue|post|{queue}")
130-
@Timed(name = "bv.emodb.queue.QueueResource1.sendBatch", absolute = true)
130+
@Timed(name = "bv.emodb.queue.QueueResource1.sendBatch1", absolute = true)
131131
@ApiOperation (value = "Send a Batch.",
132132
notes = "Returns a SuccessResponse..",
133133
response = SuccessResponse.class

0 commit comments

Comments
 (0)