-
Notifications
You must be signed in to change notification settings - Fork 158
Implement validation throttler for message validation in Pubsub #710
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: varun-r-mallya <[email protected]>
We also need to tune this once it's merged or make it self tuning according to the number of CPUs. I don't think that it would be right to do it in the same PR though. |
@varun-r-mallya : Great initiative Varun. Appreciate your efforts. Please re-base your branch, which would help resolve the merge conflicts. We will review and share feedback soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@varun-r-mallya The overall PR looks good, and the structure of validation_throttler.py is well-organized. I just have a few points where I would like to suggest some changes.
libp2p/pubsub/pubsub.py
Outdated
await self.validation_throttler.start(nursery) | ||
# Keep nursery alive until service stops | ||
while self.manager.is_running: | ||
await trio.sleep(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to use alternative mechanisms to keep the service alive, rather than polling every second to check if the manager is running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried using wait_finished, but I don't know if it's adequate enough.
…t and related methods Signed-off-by: varun-r-mallya <[email protected]>
e122ef8
to
df23e3d
Compare
@varun-r-mallya Thank you for addressing all the identified issues. |
@varun-r-mallya : Thank you for submitting the PR. Appreciate your efforts. Please add test case scenarios + newsfragment file. If you need help in identifying test case scenarios, happy to share key test cases and important scenarios. @sumanjeet0012 : Thank you for sharing feedback and for reviewing the PR. Appreciate your support. |
Please do share different test cases that I can add! |
@varun-r-mallya Here is a list of tests that should be created for PR: ValidationThrottler Unit Tests
Test that submitting more messages than queue_size triggers queue-level throttling.
Test that submitting more concurrent async validations than global_throttle_limit triggers global throttling (THROTTLED result).
Test that exceeding a topic’s throttle limit triggers per-topic throttling (THROTTLED result).
Test async validator that exceeds the timeout triggers IGNORE result.
Test that exceptions in sync and async validators are caught and result in REJECT.
Test handling of multiple concurrent validations (simulate with multiple workers). End-to-End Message ValidationTest validate_msg for:
|
@varun-r-mallya Kindly tag the relevant individuals when asking a question so that we receive notifications and can respond quickly. |
What was wrong?
Async validators in the Pubsub module (libp2p.pubsub.pubsub.py) were executed without any form of throttling or concurrency control. This could result in excessive concurrent validator tasks being spawned under high message volume, potentially exhausting system resources or causing instability.
This was originally mentioned as a TODO in the codebase, and further discussed in Discussion #647.
Issue #709
How was it fixed?
A new throttling system was introduced via a ValidationThrottler component:
Global Throttle: Limits the number of async validations running concurrently across all topics (default: 8192).
Per-Topic Throttle: Configurable semaphore-based limits on validators for individual topics (default: 1024).
Validation Queue: Bounded channel to queue incoming validation requests to prevent spikes (default: 32).
Worker Pool: Dedicated workers (default: 4) process messages and apply throttled validation logic.
Validators are now registered via Pubsub.set_topic_validator(...), and internally wrapped with throttling semantics.
This aligns py-libp2p more closely with go-libp2p-pubsub's validation model.
To-Do