Skip to content

Fix SendAsync won't be timeout during producer reconnection #1356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

RobertIndie
Copy link
Member

@RobertIndie RobertIndie commented Apr 18, 2025

Fixes #1332

Motivation

The root cause is that the producer event loops become busy during reconnection, preventing messages in dataChan from timing out. And the ctx of the SendAsync won't be respected in this case.

SendAsync can wait until the runEventLoop processes it and pushes it into the pendingQueue or a batch, just like the Java client. Before entering the pendingQueue, SendAsync itself can check for timeouts and handle the callback. After entering the pendingQueue, failTimeoutMessages can manage the timeout.

Modifications

  • Introduced a new channel enqueued to make SendAsync wait until the sendRequest is added to the pending queue.
  • Use ctx to check for timeouts and invoke the callback if a timeout occurs.

Verifying this change

The test TestSendAsyncCouldTimeoutWhileReconnecting is based on test from this PR: #1345

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API: (yes / no)
  • The schema: (yes / no / don't know)
  • The default values of configurations: (yes / no)
  • The wire protocol: (yes / no)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / GoDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

@RobertIndie RobertIndie requested a review from Copilot April 18, 2025 09:39
@RobertIndie RobertIndie self-assigned this Apr 18, 2025
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes an issue where SendAsync may not time out during producer reconnection by introducing a new channel (enqueued) to signal when send requests are added to the pending queue. It also adds tests to validate timeout behavior in reconnection scenarios and when the pending queue is full.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
pulsar/producer_test.go Added tests to verify SendAsync timeout behavior.
pulsar/producer_partition.go Updated producer logic to include and utilize the new enqueued channel.
Comments suppressed due to low confidence (1)

pulsar/producer_test.go:2705

  • Avoid using fixed sleeps for synchronization in tests as they can lead to flaky results; consider using a more robust waiting mechanism (e.g., require.Eventually) to ensure the pending queue is properly filled before proceeding.
time.Sleep(3 * time.Second)

@RobertIndie RobertIndie marked this pull request as draft April 18, 2025 09:57
@RobertIndie RobertIndie marked this pull request as ready for review April 18, 2025 12:03
select {
case <-sr.enqueued:
case <-ctx.Done():
err := ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, timeout is not set to the ctx provided by application/user, may be we should update the ctx with context.WithTimeout(ctx, config.timeout)?

@@ -1353,6 +1367,17 @@ func (p *partitionProducer) internalSendAsync(
}

p.dataChan <- sr
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the select here will block SendAsync() when producer is in reconnecting while the pengding queue is not full, it is not good for those latency-sensitive applications such as game.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug][Producer] The callback was not invoked during reconnecting.
2 participants