Skip to content

Implement validation throttler for message validation in Pubsub #710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

varun-r-mallya
Copy link
Contributor

What was wrong?

Async validators in the Pubsub module (libp2p.pubsub.pubsub.py) were executed without any form of throttling or concurrency control. This could result in excessive concurrent validator tasks being spawned under high message volume, potentially exhausting system resources or causing instability.

This was originally mentioned as a TODO in the codebase, and further discussed in Discussion #647.

Issue #709

How was it fixed?

A new throttling system was introduced via a ValidationThrottler component:

Global Throttle: Limits the number of async validations running concurrently across all topics (default: 8192).

Per-Topic Throttle: Configurable semaphore-based limits on validators for individual topics (default: 1024).

Validation Queue: Bounded channel to queue incoming validation requests to prevent spikes (default: 32).

Worker Pool: Dedicated workers (default: 4) process messages and apply throttled validation logic.

Validators are now registered via Pubsub.set_topic_validator(...), and internally wrapped with throttling semantics.
This aligns py-libp2p more closely with go-libp2p-pubsub's validation model.

To-Do

  • Clean up commit history
  • Add or update documentation related to these changes
  • Add entry to the release notes

@varun-r-mallya
Copy link
Contributor Author

I still need to add tests and stuff, but I'd like to have a review before doing that @pacrob @seetadev

@varun-r-mallya
Copy link
Contributor Author

We also need to tune this once it's merged or make it self tuning according to the number of CPUs. I don't think that it would be right to do it in the same PR though.

@seetadev
Copy link
Contributor

@varun-r-mallya : Great initiative Varun. Appreciate your efforts.

Please re-base your branch, which would help resolve the merge conflicts.

We will review and share feedback soon.

Copy link
Contributor

@sumanjeet0012 sumanjeet0012 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@varun-r-mallya The overall PR looks good, and the structure of validation_throttler.py is well-organized. I just have a few points where I would like to suggest some changes.

await self.validation_throttler.start(nursery)
# Keep nursery alive until service stops
while self.manager.is_running:
await trio.sleep(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to use alternative mechanisms to keep the service alive, rather than polling every second to check if the manager is running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using wait_finished, but I don't know if it's adequate enough.

@sumanjeet0012
Copy link
Contributor

@varun-r-mallya Thank you for addressing all the identified issues.
The PR looks good to me.
Before we proceed further, could you please add some tests to verify that the validation throttler is functioning as expected? This will also help ensure that the logic remains intact and is not broken by any future contributors.

@seetadev
Copy link
Contributor

seetadev commented Jul 3, 2025

@varun-r-mallya : Thank you for submitting the PR. Appreciate your efforts. Please add test case scenarios + newsfragment file.

If you need help in identifying test case scenarios, happy to share key test cases and important scenarios.

@sumanjeet0012 : Thank you for sharing feedback and for reviewing the PR. Appreciate your support.

@varun-r-mallya
Copy link
Contributor Author

varun-r-mallya commented Jul 3, 2025

@varun-r-mallya : Thank you for submitting the PR. Appreciate your efforts. Please add test case scenarios + newsfragment file.

If you need help in identifying test case scenarios, happy to share key test cases and important scenarios.

@sumanjeet0012 : Thank you for sharing feedback and for reviewing the PR. Appreciate your support.

Please do share different test cases that I can add!

@sumanjeet0012
Copy link
Contributor

@varun-r-mallya Here is a list of tests that should be created for PR:

ValidationThrottler Unit Tests

  • Queue Throttling

Test that submitting more messages than queue_size triggers queue-level throttling.

  • Global Throttling

Test that submitting more concurrent async validations than global_throttle_limit triggers global throttling (THROTTLED result).

  • Per-topic Throttling

Test that exceeding a topic’s throttle limit triggers per-topic throttling (THROTTLED result).

  • Timeout Handling

Test async validator that exceeds the timeout triggers IGNORE result.

  • Validator Exception Handling

Test that exceptions in sync and async validators are caught and result in REJECT.

  • Concurrency

Test handling of multiple concurrent validations (simulate with multiple workers).

End-to-End Message Validation

Test validate_msg for:

  • No validators (should accept).
  • All validators pass (should accept).
  • Any validator rejects (should reject).

@sumanjeet0012
Copy link
Contributor

@varun-r-mallya Kindly tag the relevant individuals when asking a question so that we receive notifications and can respond quickly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants