-
Notifications
You must be signed in to change notification settings - Fork 356
Fix SendAsync won't be timeout during producer reconnection #1356
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes an issue where SendAsync may not time out during producer reconnection by introducing a new channel (enqueued) to signal when send requests are added to the pending queue. It also adds tests to validate timeout behavior in reconnection scenarios and when the pending queue is full.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
File | Description |
---|---|
pulsar/producer_test.go | Added tests to verify SendAsync timeout behavior. |
pulsar/producer_partition.go | Updated producer logic to include and utilize the new enqueued channel. |
Comments suppressed due to low confidence (1)
pulsar/producer_test.go:2705
- Avoid using fixed sleeps for synchronization in tests as they can lead to flaky results; consider using a more robust waiting mechanism (e.g., require.Eventually) to ensure the pending queue is properly filled before proceeding.
time.Sleep(3 * time.Second)
select { | ||
case <-sr.enqueued: | ||
case <-ctx.Done(): | ||
err := ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, timeout is not set to the ctx provided by application/user, may be we should update the ctx with context.WithTimeout(ctx, config.timeout)?
@@ -1353,6 +1367,17 @@ func (p *partitionProducer) internalSendAsync( | |||
} | |||
|
|||
p.dataChan <- sr | |||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the select
here will block SendAsync()
when producer is in reconnecting while the pengding queue is not full, it is not good for those latency-sensitive applications such as game.
Fixes #1332
Motivation
The root cause is that the producer event loops become busy during reconnection, preventing messages in dataChan from timing out. And the ctx of the SendAsync won't be respected in this case.
SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue,
failTimeoutMessages
can manage the timeout.Modifications
enqueued
to make SendAsync wait until the sendRequest is added to the pending queue.ctx
to check for timeouts and invoke the callback if a timeout occurs.Verifying this change
The test
TestSendAsyncCouldTimeoutWhileReconnecting
is based on test from this PR: #1345Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation