Skip to content

Commit 4595679

Browse files
RobertIndieCopilot
andcommitted
Fix reader hanging when startMessageId is latest (#1364)
* Fix reader hanging when startMessageId is latest * Fix lint * Apply suggestions from code review Co-authored-by: Copilot <[email protected]> * Add comment to seekMessageId --------- Co-authored-by: Copilot <[email protected]> (cherry picked from commit 2516598)
1 parent e966d11 commit 4595679

File tree

2 files changed

+76
-2
lines changed

2 files changed

+76
-2
lines changed

pulsar/consumer_partition.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,10 @@ type partitionConsumer struct {
161161
startMessageID atomicMessageID
162162
lastDequeuedMsg *trackingMessageID
163163

164+
// This is used to track the seeking message id during the seek operation.
165+
// It will be set to nil after seek completes and reconnected.
166+
seekMessageID atomicMessageID
167+
164168
currentQueueSize uAtomic.Int32
165169
scaleReceiverQueueHint uAtomic.Bool
166170
incomingMessages uAtomic.Int32
@@ -365,6 +369,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
365369
maxQueueSize: int32(options.receiverQueueSize),
366370
queueCh: make(chan []*message, options.receiverQueueSize),
367371
startMessageID: atomicMessageID{msgID: options.startMessageID},
372+
seekMessageID: atomicMessageID{msgID: nil},
368373
connectedCh: make(chan struct{}),
369374
messageCh: messageCh,
370375
connectClosedCh: make(chan *connectionClosed, 1),
@@ -1023,7 +1028,7 @@ func (pc *partitionConsumer) requestSeek(msgID *messageID) error {
10231028
// 2. The startMessageID is reset to ensure accurate judgment when calling hasNext next time.
10241029
// Since the messages in the queue are cleared here reconnection won't reset startMessageId.
10251030
pc.lastDequeuedMsg = nil
1026-
pc.startMessageID.set(toTrackingMessageID(msgID))
1031+
pc.seekMessageID.set(toTrackingMessageID(msgID))
10271032
pc.clearQueueAndGetNextMessage()
10281033
return nil
10291034
}
@@ -1481,6 +1486,11 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID *trackingMessageID)
14811486
return false
14821487
}
14831488

1489+
// if we start at latest message, we should never discard
1490+
if pc.startMessageID.get().equal(latestMessageID) {
1491+
return false
1492+
}
1493+
14841494
if pc.options.startMessageIDInclusive {
14851495
return pc.startMessageID.get().greater(msgID.messageID)
14861496
}
@@ -1977,7 +1987,12 @@ func (pc *partitionConsumer) grabConn(assignedBrokerURL string) error {
19771987
KeySharedMeta: keySharedMeta,
19781988
}
19791989

1980-
pc.startMessageID.set(pc.clearReceiverQueue())
1990+
if seekMsgID := pc.seekMessageID.get(); seekMsgID != nil {
1991+
pc.startMessageID.set(seekMsgID)
1992+
pc.seekMessageID.set(nil) // Reset seekMessageID to nil to avoid persisting state across reconnects
1993+
} else {
1994+
pc.startMessageID.set(pc.clearReceiverQueue())
1995+
}
19811996
if pc.options.subscriptionMode != Durable {
19821997
// For regular subscriptions the broker will determine the restarting point
19831998
cmdSubscribe.StartMessageId = convertToMessageIDData(pc.startMessageID.get())

pulsar/reader_test.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/stretchr/testify/require"
27+
2628
"github.com/apache/pulsar-client-go/pulsar/backoff"
2729

2830
"github.com/apache/pulsar-client-go/pulsar/crypto"
@@ -1237,3 +1239,60 @@ func TestReaderWithSeekByTime(t *testing.T) {
12371239
})
12381240
}
12391241
}
1242+
1243+
func TestReaderReadFromLatest(t *testing.T) {
1244+
topic := newTopicName()
1245+
client, err := NewClient(ClientOptions{
1246+
URL: lookupURL,
1247+
})
1248+
1249+
require.NoError(t, err)
1250+
defer client.Close()
1251+
1252+
r, err := client.CreateReader(ReaderOptions{
1253+
Topic: topic,
1254+
StartMessageID: LatestMessageID(),
1255+
})
1256+
require.NoError(t, err)
1257+
defer r.Close()
1258+
1259+
p, err := client.CreateProducer(ProducerOptions{
1260+
Topic: topic,
1261+
})
1262+
require.NoError(t, err)
1263+
defer p.Close()
1264+
1265+
// Send messages
1266+
for i := 0; i < 10; i++ {
1267+
msg := &ProducerMessage{
1268+
Key: "key",
1269+
Payload: []byte(fmt.Sprintf("message-%d", i)),
1270+
}
1271+
id, err := p.Send(context.Background(), msg)
1272+
require.NoError(t, err)
1273+
require.NotNil(t, id)
1274+
}
1275+
1276+
// Read and verify messages
1277+
for i := 0; i < 10; i++ {
1278+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
1279+
msg, err := r.Next(ctx)
1280+
cancel()
1281+
require.NoError(t, err)
1282+
require.NotNil(t, msg)
1283+
1284+
// Verify message key
1285+
require.Equal(t, "key", msg.Key())
1286+
1287+
// Verify message payload
1288+
expectedPayload := fmt.Sprintf("message-%d", i)
1289+
require.Equal(t, []byte(expectedPayload), msg.Payload())
1290+
}
1291+
1292+
// Verify no more messages
1293+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
1294+
defer cancel()
1295+
msg, err := r.Next(ctx)
1296+
require.Error(t, err)
1297+
require.Nil(t, msg)
1298+
}

0 commit comments

Comments
 (0)