-
Notifications
You must be signed in to change notification settings - Fork 45
PD-257019 Stabilization emodb queue service changes #831
Conversation
…g the metadata from s3.
* Create kafka producer and admin service * Overload the sendAll method to reroute messages to kafka * Make the required changes in Clients like jersey2
* fetch aws configs from ssm parameter store * introduce dependency injection for stepfunction service
* include the proper naming for each stepfn execution * include all required contracts for stepfn * include the feature flag logic, fetch params from parameter store and based on the allowed queues migrate
* add changed logic for feature flag implementation * wrap the payload for stepfn around a executionInput parameter
*/ | ||
public StepFunctionService() { | ||
this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard() | ||
.withRegion("us-east-1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read from environment variables / config file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, surely we will have step functions in each region right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes Igor, this he will update the same pr.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a few comments. One in particular requires change
public void createTopic(String topic, int numPartitions, short replicationFactor, String queueType) { | ||
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); | ||
try { | ||
adminClient.createTopics(Collections.singleton(newTopic)).all().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the isTopicsExists
function prior to attempting creation here rather than rely on the consumers of this function to have to deal with checking for existence themselves?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Included both the functionalities in a single function createTopicIfNotExists()
now consumer doesn't need to deal with checking the existence of topic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
"b-1.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092," + | ||
"b-2.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092"; | ||
|
||
private static String bootstrapServersConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Can you add short description of what each of these properties are used for please? I am unsure of what lingerMsConfig
means for example? Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added description for all the configs being set for kafka
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
public void sendMessages(String topic, Collection<String> events, String queueType) { | ||
_log.info("Sending {} messages to topic '{}'", events.size(), topic); | ||
for (String event : events) { | ||
_log.debug("Sending message: {}", event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That debug log will generate a huge amount of logs! Please make sure to remove before going LIVE, especially with 3 already distinct logged information per message in the sendMessage
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we added these debug logs for checking the end to end flow, while deploying in cert
removed the excessive logs and kept only the error logs
_log.info("Message sent successfully to topic '{}' partition {} at offset {}", | ||
metadata.topic(), metadata.partition(), metadata.offset()); | ||
} catch (Exception e) { | ||
_log.error("Failed to send message to topic '{}'. Exception: {}", topic, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue(blocking): So we are swallowing the exception here? Meaning that if we fail after a few messages, we keep going with the next ones and may end-up having sent only a partial number of messages but can't recover from this...this will lead to lost messages right? Since the outer sendMessages
doesn't catch anything either, what is the plan here in terms of truly rethrowing exception and ensuring that messages are re-tried up the stack? is this an early happy path only version of the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now rethrowing the exception after logging them, this will be same behaviour of the current system, where end consumers of send
method will retry untill write to emodb is successful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you committed these changes...I don't see them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See them now, thanks (you can resolve this)
*/ | ||
public void sendMessage(String topic, String message, String queueType) { | ||
ProducerRecord<String, String> record = new ProducerRecord<>(topic, message); | ||
_log.debug("Preparing to send message to topic '{}' with value: {}", topic, message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Considering the millions of messages planned to go through this piece of code, that's gonna be a hefty amount of logs, my suggestion would be to log the error only and log info level up in the calling sendMessages
function for each batch - if this is just temporary during early e2e testing, then add a TODO to show that intent please
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes these were temporary logs while e2e testing, removed now to avoid the bombardment of logs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
*/ | ||
public StepFunctionService() { | ||
this.stepFunctionsClient = AWSStepFunctionsClientBuilder.standard() | ||
.withRegion("us-east-1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, surely we will have step functions in each region right?
7ed6920
to
ca1ad1d
Compare
|
||
import java.util.Collections; | ||
import java.util.concurrent.ExecutionException; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add javadocs wherever necessary in public classes, methods.
* throw back exceptions after logging * include create topic and checktopic in one function * add comments for better description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit concerned with the sendMessage
function - not sure we are taking advantage of the internal batching capabilities by doing a blocking get() call on the Future.
Also think we are still swallowing the exception within the callback by just logging it and not re-throwing.
_log.info("Message sent successfully to topic '{}' partition {} at offset {}", | ||
metadata.topic(), metadata.partition(), metadata.offset()); | ||
} catch (Exception e) { | ||
_log.error("Failed to send message to topic '{}'. Exception: {}", topic, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See them now, thanks (you can resolve this)
producer.flush(); | ||
producer.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You will want to wrap this with a try ... catch ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wrapped this in try-catch addressed in a new pr #838
} | ||
}); | ||
// Optionally, you can wait for the send to complete | ||
RecordMetadata metadata = future.get(); // Blocking call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a blocking call and will impact performance if the sendMessages
method is called with many messages. It is preferable to let the internal of the Kafka Client batch the calls based on your LingerMS and BatchSize parameters, which won't be leveraged in this case if you call the blocking get()
method on the future object.
I would keep the sendMessage
as is (for singular message send scenario) and create a private async version (sendMessageAsync
) that returns a future that the sendMessages
can use and check for the Future.isDone()
across all "Futures" for completion. This way you can truly benefit from Kafka Producer Batching capability.
I would be curious to see the current impact to performance of the sendBatch
function before and after...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
At high level, we need the call to be synchronous because we need producers to retry in case of failure.
-
TLDR:
send().get()
will block until the specific message is acknowledged, but Kafka will still batch messages in the background based on batch.size and linger.ms. -
Async :
producer.send(record)
is non-blocking and returns immediately after placing the message in an internal buffer. -
The message may not be sent to Kafka immediately, depending on the batching configuration (batch.size and linger.ms).
-
Sync: When you call
send(record).get()
, it forces the main thread to wait until the producer receives an ack from the Kafka broker that the message has been successfully processed.
It doesn’t stop Kafka from batching other messages in the background.
Even though the get() call waits for an acknowledgment of a specific message, Kafka still batches messages in the background.
If we have several messages being sent, Kafka will batch those messages up to the configured batch.size or wait up to linger.ms before sending a request to the broker.
The send() method places messages into an internal buffer, and if enough messages accumulate to fill the batch, they will be sent as a single batch to Kafka, even though the main thread is waiting for the result of a specific message.
try { | ||
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> { | ||
if (exception != null) { | ||
_log.error("Failed to send message to topic '{}'. Error: {}", topic, exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue(blocking): This will also swallow the exception and mean we won't detect errors.
Probably worth a dedicate mocked unit test to prove this (look at this as an example: https://stackoverflow.com/a/69303666)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rishuyadavbv please raise a PR including a test case and not catching the exception and share the details here.
@@ -67,6 +67,23 @@ public void sendAll(String apiKey, String queue, Collection<?> messages) { | |||
.post(messages)); | |||
} | |||
|
|||
public void sendAll(String apiKey, String queue, Collection<?> messages, boolean isFlush) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the use of 'isFlush' variable here?
Why not call the 'sendAll(apiKey, queue, messages)' function itself, rather replicating the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is different Veda, the one with isFlush
will go to cassandra, without will go to Kafka.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isFlush variable has two use cases:
- to overload the method :
sendAll(String apiKey, String queue, Collection<?> messages)
: is used to send messages to kafka
sendAll(String apiKey, String queue, Collection<?> messages, boolean isFlush)
: sends the messages directly to cassandra
- if ifFlush is true-> means request is coming from sendBatch1( endpoint used by lambda function to send messages to cassandra) -> Don't do any checks, already been checked before pushing to kafka, just push the data
- if isFlush is false-> means requests are coming directly from client and its not an allowed queue so directly send the message to cassandra-> do all the checks to validate queue name and messages
try { | ||
return adminClient.listTopics().names().get().contains(topic); | ||
} catch (Exception e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Why not log the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the log for it
NewTopic newTopic = new NewTopic(topic, numPartitions, replicationFactor); | ||
try { | ||
adminClient.createTopics(Collections.singleton(newTopic)).all().get(); | ||
_log.info("Created topic: {} with numPartitions: {} ", topic, numPartitions, replicationFactor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: The replicationFactor variable is not logged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replication factor is also logged now made the change already
adminClient.createTopics(Collections.singleton(newTopic)).all().get(); | ||
_log.info("Created topic: {} with numPartitions: {} ", topic, numPartitions, replicationFactor); | ||
} catch (Exception e) { | ||
_log.error("Error creating topic {}: {}", topic, e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: Do log the stacktrace of the exception for better debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logged the stacktrace #838
* Closes the AdminClient to release resources. | ||
*/ | ||
public void close() { | ||
adminClient.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: null check on adminClient object would be good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a null check here
.build(); | ||
|
||
private static final String DEFAULT_BOOTSTRAP_SERVERS = | ||
"b-1.qaemodbpocmsk.q4panq.c10.kafka.us-east-1.amazonaws.com:9092," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Can this be picked from config properties or system variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we don't require any default bootstrap servers so removed the default , fetching it from ssm parameters
#838
// Configures the Kafka broker addresses for producer connections. | ||
bootstrapServersConfig = parameterValues.getOrDefault("/emodb/kafka/bootstrapServers", DEFAULT_BOOTSTRAP_SERVERS); | ||
|
||
logger.info("Kafka configurations loaded successfully from SSM."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: It would be good log the configurations as well for better visibility and debugging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logged all the configuration values which are being fetched from ssm
Created a pr for the suggested changes #838
throw e; | ||
} | ||
catch (Exception e) { | ||
logger.error("Unexpected error occurred while loading configurations from SSM. Using default values.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the exception is thrown, why to log it here? its unnecessary logging and discomfort in debugging.
todo: Make the last catch block in the trace log it before acting on the exception. This is a logging best practice
import java.util.Date; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use this.
try { | ||
Future<RecordMetadata> future = producer.send(record, (metadata, exception) -> { | ||
if (exception != null) { | ||
_log.error("Failed to send message to topic '{}'. Error: {}", topic, exception.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rishuyadavbv please raise a PR including a test case and not catching the exception and share the details here.
@@ -67,6 +67,23 @@ public void sendAll(String apiKey, String queue, Collection<?> messages) { | |||
.post(messages)); | |||
} | |||
|
|||
public void sendAll(String apiKey, String queue, Collection<?> messages, boolean isFlush) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is different Veda, the one with isFlush
will go to cassandra, without will go to Kafka.
} | ||
}); | ||
// Optionally, you can wait for the send to complete | ||
RecordMetadata metadata = future.get(); // Blocking call |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
At high level, we need the call to be synchronous because we need producers to retry in case of failure.
-
TLDR:
send().get()
will block until the specific message is acknowledged, but Kafka will still batch messages in the background based on batch.size and linger.ms. -
Async :
producer.send(record)
is non-blocking and returns immediately after placing the message in an internal buffer. -
The message may not be sent to Kafka immediately, depending on the batching configuration (batch.size and linger.ms).
-
Sync: When you call
send(record).get()
, it forces the main thread to wait until the producer receives an ack from the Kafka broker that the message has been successfully processed.
It doesn’t stop Kafka from batching other messages in the background.
Even though the get() call waits for an acknowledgment of a specific message, Kafka still batches messages in the background.
If we have several messages being sent, Kafka will batch those messages up to the configured batch.size or wait up to linger.ms before sending a request to the broker.
The send() method places messages into an internal buffer, and if enough messages accumulate to fill the batch, they will be sent as a single batch to Kafka, even though the main thread is waiting for the result of a specific message.
…function firing (#844) * PD-257019 Stabilization emodb queue service changes (#831) * PD-249429: integrated emodb with datastorage-media-service for getting the metadata from s3. * PD-249429: refactored code. * PD-249428: DELETE blob with blob id. * PD-249428: Implemented upload blob from byte array. * PD-249428: Implemented upload blob from byte array. * PD-249428: Fixed failing ITs. * PD-249428: Fixed failing ITs. * PD-249428: Changed the snapshot version. * PD-249428: Refactored code. * PD-249428: changed the version to 172. * PD-249428: commented tests. * branch admin -prepare release emodb-6.5.172 * PD-249428: changed snapshot versions. * PD-249428: changed snapshot versions. * branch admin -prepare release emodb-6.5.173 * branch admin -prepare for next development iteration * changed the BASE_URL * PD-256742: fixed all bugs. * PD-256742: changed snapshot version. * PD-256742: changed snapshot version. * branch admin -prepare release emodb-6.5.176 * branch admin -prepare for next development iteration * for testing * feat: integrate kafka to emodb * Create kafka producer and admin service * Overload the sendAll method to reroute messages to kafka * Make the required changes in Clients like jersey2 * feat: include logic for separation of queue and dedupq * feat: include guava dependency injection for emodb * fix: seperation of producer and admin service and proper injection * fix: fix DI wiring issue * feat: integrate triggering step function execution on creation of new topic * feat: merge changes for stepfunction * branch admin -prepare release emodb-6.5.175 * branch admin -prepare for next development iteration * feat: add parameter store * fix:add aws core dependency * fix:remove credentialProvider * fix: changes for ci deployment * fix: pom changes for blob * chore: update tags to 177 for deployment * branch admin -prepare release emodb-6.5.177 * branch admin -prepare for next development iteration * chore: update msk servers * fix: include msk server url * chore: snapshot update to 181 * branch admin -prepare release emodb-6.5.181 * branch admin -prepare for next development iteration * feat: include working parameter store and stepfn changes * add proper exception handling in parameter store and stepfn * make abstractqueue service code modular and organized * branch admin -prepare release emodb-6.5.182 * branch admin -prepare for next development iteration * feat: fetch kafka configs from ssm and dependency injection for stepfn * fetch aws configs from ssm parameter store * introduce dependency injection for stepfunction service * branch admin -prepare release emodb-6.5.183 * branch admin -prepare for next development iteration * feat: feature flag implementation for queue migrations * include the proper naming for each stepfn execution * include all required contracts for stepfn * include the feature flag logic, fetch params from parameter store and based on the allowed queues migrate * fix: fix param for allowedQueues * branch admin -prepare release emodb-6.5.184 * branch admin -prepare for next development iteration * fix: remove blob changes * chore: add comments and fix start.sh * fix: changed logic for feature flag * add changed logic for feature flag implementation * wrap the payload for stepfn around a executionInput parameter * fix: fixed exception handling and code refactoring * throw back exceptions after logging * include create topic and checktopic in one function * add comments for better description --------- Co-authored-by: vikram-vikram_bveng <[email protected]> Co-authored-by: jenkins <[email protected]> Co-authored-by: nabajyotiDash-hub <[email protected]> * update pom version * branch admin -prepare release emodb-6.5.192 * branch admin -prepare for next development iteration * modify producer properties (#834) * PD-257019 Add universe fetching and bug fixes (#835) * feat: add universe fetching and bug fixes * fix: remove debug logs * branch admin -prepare release emodb-6.5.193 * branch admin -prepare for next development iteration * uncached_size for dedupq (#837) * PD-257019 Producer send and logging changes (#838) * fix: fix send logic and exception handling changes * added exception handling * changed producer send logic to be synchronous * removed redundant logs * uncached_size for dedupq (#837) * chore: rename queuetype as per convention * chore: fix naming issue --------- Co-authored-by: nabajyotiDash-hub <[email protected]> * branch admin -prepare release emodb-6.5.194 * branch admin -prepare for next development iteration * branch admin -prepare release emodb-6.5.195 * branch admin -prepare for next development iteration * branch admin -prepare release emodb-6.5.196 * branch admin -prepare for next development iteration * chore: snapshot updates * branch admin -prepare release emodb-6.5.194 * branch admin -prepare for next development iteration * [PD-258160] added dd metrics for queue name and queue size (#836) * branch admin -prepare release emodb-6.5.195 * alter pom version for bvengineering deployment * branch admin -prepare release emodb-6.5.196 * update pom to 6.5.197-SNAPSHOT * branch admin -prepare release emodb-6.5.197 * branch admin -prepare for next development iteration * branch admin -prepare release emodb-6.5.198 * branch admin -prepare for next development iteration * add cache * add allowedQueues cache * feat: producer future logic and caching changes * chore: clear out unused code * PD-257019 Producer send and logging changes (#838) * fix: fix send logic and exception handling changes * added exception handling * changed producer send logic to be synchronous * removed redundant logs * uncached_size for dedupq (#837) * chore: rename queuetype as per convention * chore: fix naming issue --------- Co-authored-by: nabajyotiDash-hub <[email protected]> * fix: fix the config changes * Revert "Merge branch 'master' into PD-257019-producer-async-fixes" This reverts commit 93f5a76, reversing changes made to 15ce863. * chore: snapshot updates for deployement * chore: clear up testing logs * fix: remove default servers * branch admin -prepare release emodb-6.5.202 * branch admin -prepare for next development iteration * chore: caching logs refactoring * branch admin -prepare release emodb-6.5.203 * branch admin -prepare for next development iteration --------- Co-authored-by: vikram-vikram_bveng <[email protected]> Co-authored-by: jenkins <[email protected]> Co-authored-by: nabajyotiDash-hub <[email protected]> Co-authored-by: nabajyotiDash-hub <[email protected]> Co-authored-by: Anurag Dubey <[email protected]>
Pull Request: Integrate changes for emodb stabilization, kafka integration and stepfunction firing
Description:
As part of the Emo Stabilization Phase 1, this PR integrates the Kafka Producer and AdminClient into the Queue Module to ensure robust messaging and topic management. The goal is sending messages to Kafka, handle automatic Kafka topic creation, firing the stepfunction execution, and store the incoming throttled messages from kafka to cassandra
Key Changes:
Kafka Producer & AdminClient Integration:
sendAll
function, which handles the topic creation and stepfn firing as wellAutomatic Topic Creation:
Step Function Execution:
AWS Parameter Store Integration:
New Endpoint for Cassandra Queue Service:
Additional Notes:
Checklist: