diff --git a/Makefile b/Makefile index f6602db9e2..02554d976b 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ bin/golangci-lint: # use golangCi-lint docker to avoid local golang env issues # https://golangci-lint.run/welcome/install/ lint-docker: - docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.51.2 golangci-lint run -v + docker run --rm -v $(shell pwd):/app -w /app golangci/golangci-lint:v1.61.0 golangci-lint run -v container: docker build -t ${IMAGE_NAME} \ diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a4c2e3f8c4..ae17d17dcb 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -632,6 +632,11 @@ func (p *partitionProducer) internalSend(sr *sendRequest) { if sr.flushImmediately { p.internalFlushCurrentBatch() } + if sr.enqueued != nil { + sr.enqueuedOnce.Do(func() { + close(sr.enqueued) + }) + } return } @@ -907,6 +912,13 @@ func (p *partitionProducer) writeData(buffer internal.Buffer, sequenceID uint64, sequenceID: sequenceID, sendRequests: callbacks, }) + for _, cb := range callbacks { + if sr, ok := cb.(*sendRequest); ok && sr.enqueued != nil { + sr.enqueuedOnce.Do(func() { + close(sr.enqueued) + }) + } + } p._getConn().WriteData(ctx, buffer) } } @@ -1312,6 +1324,8 @@ func (p *partitionProducer) internalSendAsync( flushImmediately: flushImmediately, publishTime: time.Now(), chunkID: -1, + enqueued: make(chan struct{}), + enqueuedOnce: &sync.Once{}, } if err := p.prepareTransaction(sr); err != nil { @@ -1353,6 +1367,17 @@ func (p *partitionProducer) internalSendAsync( } p.dataChan <- sr + select { + case <-sr.enqueued: + case <-ctx.Done(): + err := ctx.Err() + if errors.Is(err, context.DeadlineExceeded) { // Convert DeadlineExceeded error to ErrSendTimeout + err = ErrSendTimeout + } + sr.callbackOnce.Do(func() { + runCallback(callback, nil, msg, err) + }) + } } func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) { @@ -1583,6 +1608,8 @@ type sendRequest struct { chunkID int uuid string chunkRecorder *chunkRecorder + enqueued chan struct{} + enqueuedOnce *sync.Once // resource management diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index b827929964..6d7faf632b 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -2605,3 +2605,114 @@ func TestSelectConnectionForSameProducer(t *testing.T) { client.Close() } + +func TestSendAsyncCouldTimeoutWhileReconnecting(t *testing.T) { + testSendAsyncCouldTimeoutWhileReconnecting(t, false) + testSendAsyncCouldTimeoutWhileReconnecting(t, true) +} + +func testSendAsyncCouldTimeoutWhileReconnecting(t *testing.T, isDisableBatching bool) { + t.Helper() + + req := testcontainers.ContainerRequest{ + Image: getPulsarTestImage(), + ExposedPorts: []string{"6650/tcp", "8080/tcp"}, + WaitingFor: wait.ForExposedPort(), + Cmd: []string{"bin/pulsar", "standalone", "-nfw"}, + } + c, err := testcontainers.GenericContainer(context.Background(), testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err, "Failed to start the pulsar container") + defer func() { + err := c.Terminate(context.Background()) + if err != nil { + t.Fatal("Failed to terminate the pulsar container", err) + } + }() + + endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar") + require.NoError(t, err, "Failed to get the pulsar endpoint") + + client, err := NewClient(ClientOptions{ + URL: endpoint, + ConnectionTimeout: 5 * time.Second, + OperationTimeout: 5 * time.Second, + }) + require.NoError(t, err) + defer client.Close() + + var testProducer Producer + require.Eventually(t, func() bool { + testProducer, err = client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + Schema: NewBytesSchema(nil), + SendTimeout: 3 * time.Second, + DisableBatching: isDisableBatching, + BatchingMaxMessages: 5, + MaxPendingMessages: 10, + }) + return err == nil + }, 30*time.Second, 1*time.Second) + + numMessages := 10 + // Send 10 messages synchronously + for i := 0; i < numMessages; i++ { + send, err := testProducer.Send(context.Background(), &ProducerMessage{Payload: []byte("test")}) + require.NoError(t, err) + require.NotNil(t, send) + } + + // stop pulsar server + timeout := 10 * time.Second + err = c.Stop(context.Background(), &timeout) + require.NoError(t, err) + + // Test the SendAsync could be timeout if the producer is reconnecting + + finalErr := make(chan error, 1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + testProducer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + finalErr <- err + }) + select { + case <-time.After(10 * time.Second): + t.Fatal("test timeout") + case err = <-finalErr: + // should get a timeout error + require.ErrorIs(t, err, ErrSendTimeout) + } + close(finalErr) + + // Test that the SendAsync could be timeout if the pending queue is full + + go func() { + // Send 10 messages asynchronously to make the pending queue full + for i := 0; i < numMessages; i++ { + testProducer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, _ error) { + }) + } + }() + + time.Sleep(3 * time.Second) + finalErr = make(chan error, 1) + testProducer.SendAsync(ctx, &ProducerMessage{ + Payload: []byte("test"), + }, func(_ MessageID, _ *ProducerMessage, err error) { + finalErr <- err + }) + select { + case <-time.After(10 * time.Second): + t.Fatal("test timeout") + case err = <-finalErr: + // should get a timeout error + require.ErrorIs(t, err, ErrSendTimeout) + } + close(finalErr) +}