Skip to content

Fix SendAsync won't be timeout during producer reconnection #1356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ bin/golangci-lint:
# use golangCi-lint docker to avoid local golang env issues
# https://golangci-lint.run/welcome/install/
lint-docker:
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v
docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v

container:
docker build -t ${IMAGE_NAME} \
Expand Down
27 changes: 27 additions & 0 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,11 @@ func (p *partitionProducer) internalSend(sr *sendRequest) {
if sr.flushImmediately {
p.internalFlushCurrentBatch()
}
if sr.enqueued != nil {
sr.enqueuedOnce.Do(func() {
close(sr.enqueued)
})
}
return
}

Expand Down Expand Up @@ -907,6 +912,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64,
sequenceID: sequenceID,
sendRequests: callbacks,
})
for _, cb := range callbacks {
if sr, ok := cb.(*sendRequest); ok && sr.enqueued != nil {
sr.enqueuedOnce.Do(func() {
close(sr.enqueued)
})
}
}
p._getConn().WriteData(ctx, buffer)
}
}
Expand Down Expand Up @@ -1312,6 +1324,8 @@ func (p *partitionProducer) internalSendAsync(
flushImmediately: flushImmediately,
publishTime: time.Now(),
chunkID: -1,
enqueued: make(chan struct{}),
enqueuedOnce: &sync.Once{},
}

if err := p.prepareTransaction(sr); err != nil {
Expand Down Expand Up @@ -1353,6 +1367,17 @@ func (p *partitionProducer) internalSendAsync(
}

p.dataChan <- sr
select {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the select here will block SendAsync() when producer is in reconnecting while the pengding queue is not full, it is not good for those latency-sensitive applications such as game.

case <-sr.enqueued:
case <-ctx.Done():
err := ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, timeout is not set to the ctx provided by application/user, may be we should update the ctx with context.WithTimeout(ctx, config.timeout)?

if errors.Is(err, context.DeadlineExceeded) { // Convert DeadlineExceeded error to ErrSendTimeout
err = ErrSendTimeout
}
sr.callbackOnce.Do(func() {
runCallback(callback, nil, msg, err)
})
}
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
Expand Down Expand Up @@ -1583,6 +1608,8 @@ type sendRequest struct {
chunkID int
uuid string
chunkRecorder *chunkRecorder
enqueued chan struct{}
enqueuedOnce *sync.Once

// resource management

Expand Down
111 changes: 111 additions & 0 deletions pulsar/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,3 +2605,114 @@ func TestSelectConnectionForSameProducer(t *testing.T) {

client.Close()
}

func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) {
testSendAsyncCouldTimeoutWhileReconnecting(t, false)
testSendAsyncCouldTimeoutWhileReconnecting(t, true)
}

func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) {
t.Helper()

req := testcontainers.ContainerRequest{
Image: getPulsarTestImage(),
ExposedPorts: []string{"6650/tcp", "8080/tcp"},
WaitingFor: wait.ForExposedPort(),
Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
}
c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
require.NoError(t, err, "Failed to start the pulsar container")
defer func() {
err := c.Terminate(context.Background())
if err != nil {
t.Fatal("Failed to terminate the pulsar container", err)
}
}()

endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
require.NoError(t, err, "Failed to get the pulsar endpoint")

client, err := NewClient(ClientOptions{
URL: endpoint,
ConnectionTimeout: 5 * time.Second,
OperationTimeout: 5 * time.Second,
})
require.NoError(t, err)
defer client.Close()

var testProducer Producer
require.Eventually(t, func() bool {
testProducer, err = client.CreateProducer(ProducerOptions{
Topic: newTopicName(),
Schema: NewBytesSchema(nil),
SendTimeout: 3 * time.Second,
DisableBatching: isDisableBatching,
BatchingMaxMessages: 5,
MaxPendingMessages: 10,
})
return err == nil
}, 30*time.Second, 1*time.Second)

numMessages := 10
// Send 10 messages synchronously
for i := 0; i < numMessages; i++ {
send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")})
require.NoError(t, err)
require.NotNil(t, send)
}

// stop pulsar server
timeout := 10 * time.Second
err = c.Stop(context.Background(), &timeout)
require.NoError(t, err)

// Test the SendAsync could be timeout if the producer is reconnecting

finalErr := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
testProducer.SendAsync(ctx, &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
finalErr <- err
})
select {
case <-time.After(10 * time.Second):
t.Fatal("test timeout")
case err = <-finalErr:
// should get a timeout error
require.ErrorIs(t, err, ErrSendTimeout)
}
close(finalErr)

// Test that the SendAsync could be timeout if the pending queue is full

go func() {
// Send 10 messages asynchronously to make the pending queue full
for i := 0; i < numMessages; i++ {
testProducer.SendAsync(context.Background(), &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, _ error) {
})
}
}()

time.Sleep(3 * time.Second)
finalErr = make(chan error, 1)
testProducer.SendAsync(ctx, &ProducerMessage{
Payload: []byte("test"),
}, func(_ MessageID, _ *ProducerMessage, err error) {
finalErr <- err
})
select {
case <-time.After(10 * time.Second):
t.Fatal("test timeout")
case err = <-finalErr:
// should get a timeout error
require.ErrorIs(t, err, ErrSendTimeout)
}
close(finalErr)
}
Loading