Skip to content

Commit e81a20d

Browse files
authored
[fix][broker] Avoid consumers receiving acknowledged messages from compacted topic after reconnection (#21187)
1 parent 434da8b commit e81a20d

File tree

9 files changed

+229
-15
lines changed

9 files changed

+229
-15
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,10 @@ void markDelete(Position position, Map<String, Long> properties)
517517
*/
518518
void rewind();
519519

520+
default void rewind(boolean readCompacted) {
521+
rewind();
522+
}
523+
520524
/**
521525
* Move the cursor to a different read position.
522526
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,7 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
682682
LedgerHandle recoveredFromCursorLedger) {
683683
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
684684
// we need to move to the next existing ledger
685-
if (!ledger.ledgerExists(position.getLedgerId())) {
685+
if (position.getEntryId() == -1L && !ledger.ledgerExists(position.getLedgerId())) {
686686
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
687687
if (nextExistingLedger == null) {
688688
log.info("[{}] [{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
@@ -2522,9 +2522,15 @@ public Position getPersistentMarkDeletedPosition() {
25222522

25232523
@Override
25242524
public void rewind() {
2525+
rewind(false);
2526+
}
2527+
2528+
@Override
2529+
public void rewind(boolean readCompacted) {
25252530
lock.writeLock().lock();
25262531
try {
2527-
PositionImpl newReadPosition = ledger.getNextValidPosition(markDeletePosition);
2532+
PositionImpl newReadPosition =
2533+
readCompacted ? markDeletePosition.getNext() : ledger.getNextValidPosition(markDeletePosition);
25282534
PositionImpl oldReadPosition = readPosition;
25292535

25302536
log.info("[{}-{}] Rewind from {} to {}", ledger.getName(), name, oldReadPosition, newReadPosition);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
5555
import org.apache.pulsar.common.util.Codec;
5656
import org.apache.pulsar.compaction.CompactedTopicUtils;
57+
import org.apache.pulsar.compaction.Compactor;
5758
import org.apache.pulsar.compaction.TopicCompactionService;
5859
import org.slf4j.Logger;
5960
import org.slf4j.LoggerFactory;
@@ -107,9 +108,9 @@ protected void scheduleReadOnActiveConsumer() {
107108
if (log.isDebugEnabled()) {
108109
log.debug("[{}] Rewind cursor and read more entries without delay", name);
109110
}
110-
cursor.rewind();
111-
112111
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
112+
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
113+
113114
notifyActiveConsumerChanged(activeConsumer);
114115
readMoreEntries(activeConsumer);
115116
return;
@@ -127,9 +128,9 @@ protected void scheduleReadOnActiveConsumer() {
127128
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
128129
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
129130
}
130-
cursor.rewind();
131-
132131
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
132+
cursor.rewind(activeConsumer != null && activeConsumer.readCompacted());
133+
133134
notifyActiveConsumerChanged(activeConsumer);
134135
readMoreEntries(activeConsumer);
135136
readOnActiveConsumerTask = null;
@@ -206,7 +207,7 @@ private synchronized void internalReadEntriesComplete(final List<Entry> entries,
206207
}
207208
}
208209
entries.forEach(Entry::release);
209-
cursor.rewind();
210+
cursor.rewind(currentConsumer != null ? currentConsumer.readCompacted() : readConsumer.readCompacted());
210211
if (currentConsumer != null) {
211212
notifyActiveConsumerChanged(currentConsumer);
212213
readMoreEntries(currentConsumer);
@@ -301,7 +302,7 @@ private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consu
301302
}
302303
cursor.cancelPendingReadRequest();
303304
havePendingRead = false;
304-
cursor.rewind();
305+
cursor.rewind(consumer.readCompacted());
305306
if (log.isDebugEnabled()) {
306307
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
307308
}
@@ -362,7 +363,9 @@ private void readMoreEntries(Consumer consumer) {
362363
}
363364
havePendingRead = true;
364365
if (consumer.readCompacted()) {
365-
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
366+
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
367+
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
368+
|| hasValidMarkDeletePosition(cursor));
366369
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
367370
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
368371
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
@@ -380,6 +383,13 @@ private void readMoreEntries(Consumer consumer) {
380383
}
381384
}
382385

386+
private boolean hasValidMarkDeletePosition(ManagedCursor cursor) {
387+
// If `markDeletedPosition.entryID == -1L` then the md-position is an invalid position,
388+
// since the initial md-position of the consumer will be set to it.
389+
// See ManagedLedgerImpl#asyncOpenCursor and ManagedLedgerImpl#getFirstPosition
390+
return cursor.getMarkDeletedPosition() != null && cursor.getMarkDeletedPosition().getEntryId() == -1L;
391+
}
392+
383393
@Override
384394
protected void reScheduleRead() {
385395
if (isRescheduleReadInProgress.compareAndSet(false, true)) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,7 +1035,9 @@ public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subs
10351035
}
10361036

10371037
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
1038-
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated,
1038+
InitialPosition initialPosition,
1039+
long startMessageRollbackDurationSec,
1040+
boolean replicated,
10391041
Map<String, String> subscriptionProperties) {
10401042
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
10411043
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
@@ -1045,7 +1047,6 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
10451047
}
10461048

10471049
Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
1048-
10491050
ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, subscriptionProperties,
10501051
new OpenCursorCallback() {
10511052
@Override

pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
101101
boolean isFirstRead,
102102
ReadEntriesCallback callback, Consumer consumer) {
103103
PositionImpl cursorPosition;
104-
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
104+
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())
105+
&& (!cursor.isDurable() || cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)
106+
|| cursor.getMarkDeletedPosition() == null
107+
|| cursor.getMarkDeletedPosition().getEntryId() == -1L);
108+
if (readFromEarliest){
105109
cursorPosition = PositionImpl.EARLIEST;
106110
} else {
107111
cursorPosition = (PositionImpl) cursor.getReadPosition();

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.pulsar.client.api.PulsarClient;
5353
import org.apache.pulsar.client.api.PulsarClientException;
5454
import org.apache.pulsar.client.api.Schema;
55+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
5556
import org.apache.pulsar.client.api.SubscriptionType;
5657
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
5758
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -868,6 +869,7 @@ public void testReplicatedSubscriptionWithCompaction() throws Exception {
868869
.topic(topicName)
869870
.subscriptionName("sub2")
870871
.subscriptionType(SubscriptionType.Exclusive)
872+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
871873
.readCompacted(true)
872874
.subscribe();
873875
List<String> result = new ArrayList<>();

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1893,6 +1893,7 @@ public void testReadCommittedWithCompaction() throws Exception{
18931893
.topic(topic)
18941894
.subscriptionName("sub")
18951895
.subscriptionType(SubscriptionType.Exclusive)
1896+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
18961897
.readCompacted(true)
18971898
.subscribe();
18981899
List<String> result = new ArrayList<>();

pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ReaderTest.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,4 +785,32 @@ public void testReaderListenerAcknowledgement()
785785
admin.topics().deletePartitionedTopic(partitionedTopic);
786786
}
787787

788+
@Test
789+
public void testReaderReconnectedFromNextEntry() throws Exception {
790+
final String topic = "persistent://my-property/my-ns/testReaderReconnectedFromNextEntry";
791+
Reader<String> reader = pulsarClient.newReader(Schema.STRING).topic(topic).receiverQueueSize(1)
792+
.startMessageId(MessageId.earliest).create();
793+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
794+
795+
// Send 3 and consume 1.
796+
producer.send("1");
797+
producer.send("2");
798+
producer.send("3");
799+
Message<String> msg1 = reader.readNext(2, TimeUnit.SECONDS);
800+
assertEquals(msg1.getValue(), "1");
801+
802+
// Trigger reader reconnect.
803+
admin.topics().unload(topic);
804+
805+
// For non-durable we are going to restart from the next entry.
806+
Message<String> msg2 = reader.readNext(2, TimeUnit.SECONDS);
807+
assertEquals(msg2.getValue(), "2");
808+
Message<String> msg3 = reader.readNext(2, TimeUnit.SECONDS);
809+
assertEquals(msg3.getValue(), "3");
810+
811+
// cleanup.
812+
reader.close();
813+
producer.close();
814+
admin.topics().delete(topic, false);
815+
}
788816
}

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java

Lines changed: 161 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,7 @@ public void testReceiverQueueSize() throws Exception {
18751875

18761876
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
18771877
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
1878+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
18781879
.subscribe();
18791880

18801881
//Give some time to consume
@@ -1918,6 +1919,7 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {
19181919

19191920
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
19201921
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
1922+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
19211923
.subscribe();
19221924

19231925
Awaitility.await().untilAsserted(() -> {
@@ -2190,9 +2192,11 @@ public void testCompactionWithTTL() throws Exception {
21902192
});
21912193

21922194
@Cleanup
2193-
Consumer<String> consumer =
2194-
pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true)
2195-
.subscribe();
2195+
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
2196+
.subscriptionName("sub-2")
2197+
.readCompacted(true)
2198+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
2199+
.subscribe();
21962200

21972201
List<String> result = new ArrayList<>();
21982202
while (true) {
@@ -2206,4 +2210,158 @@ public void testCompactionWithTTL() throws Exception {
22062210

22072211
Assert.assertEquals(result, List.of("V3", "V4", "V5"));
22082212
}
2213+
2214+
@Test
2215+
public void testAcknowledgeWithReconnection() throws Exception {
2216+
final String topicName = "persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
2217+
final String subName = "my-sub";
2218+
@Cleanup
2219+
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
2220+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
2221+
.enableBatching(false).topic(topicName).create();
2222+
2223+
List<String> expected = new ArrayList<>();
2224+
for (int i = 0; i < 10; i++) {
2225+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
2226+
expected.add(String.valueOf(i));
2227+
}
2228+
producer.flush();
2229+
2230+
admin.topics().triggerCompaction(topicName);
2231+
2232+
Awaitility.await().untilAsserted(() -> {
2233+
assertEquals(admin.topics().compactionStatus(topicName).status,
2234+
LongRunningProcessStatus.Status.SUCCESS);
2235+
});
2236+
2237+
// trim the topic
2238+
admin.topics().unload(topicName);
2239+
2240+
Awaitility.await().untilAsserted(() -> {
2241+
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
2242+
assertEquals(internalStats.numberOfEntries, 0);
2243+
});
2244+
2245+
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
2246+
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
2247+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
2248+
.isAckReceiptEnabled(true)
2249+
.subscribe();
2250+
2251+
List<String> results = new ArrayList<>();
2252+
for (int i = 0; i < 5; i++) {
2253+
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
2254+
if (message == null) {
2255+
break;
2256+
}
2257+
results.add(message.getValue());
2258+
consumer.acknowledge(message);
2259+
}
2260+
2261+
Awaitility.await().untilAsserted(() ->
2262+
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2263+
5));
2264+
2265+
// Make consumer reconnect to broker
2266+
admin.topics().unload(topicName);
2267+
2268+
// Wait for consumer to reconnect and clear incomingMessages
2269+
consumer.pause();
2270+
Awaitility.await().untilAsserted(() -> {
2271+
Assert.assertEquals(consumer.numMessagesInQueue(), 0);
2272+
});
2273+
consumer.resume();
2274+
2275+
for (int i = 0; i < 5; i++) {
2276+
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
2277+
if (message == null) {
2278+
break;
2279+
}
2280+
results.add(message.getValue());
2281+
consumer.acknowledge(message);
2282+
}
2283+
2284+
Awaitility.await().untilAsserted(() ->
2285+
assertEquals(admin.topics().getStats(topicName, true).getSubscriptions().get(subName).getMsgBacklog(),
2286+
0));
2287+
2288+
Assert.assertEquals(results, expected);
2289+
2290+
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
2291+
Assert.assertNull(message);
2292+
2293+
// Make consumer reconnect to broker
2294+
admin.topics().unload(topicName);
2295+
2296+
producer.newMessage().key("K").value("V").send();
2297+
Message<String> message2 = consumer.receive(3, TimeUnit.SECONDS);
2298+
Assert.assertEquals(message2.getValue(), "V");
2299+
consumer.acknowledge(message2);
2300+
2301+
Awaitility.await().untilAsserted(() -> {
2302+
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName);
2303+
Assert.assertEquals(internalStats.lastConfirmedEntry,
2304+
internalStats.cursors.get(subName).markDeletePosition);
2305+
});
2306+
2307+
consumer.close();
2308+
producer.close();
2309+
}
2310+
2311+
@Test
2312+
public void testEarliestSubsAfterRollover() throws Exception {
2313+
final String topicName = "persistent://my-property/use/my-ns/testEarliestSubsAfterRollover" + UUID.randomUUID();
2314+
final String subName = "my-sub";
2315+
@Cleanup
2316+
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
2317+
@Cleanup
2318+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
2319+
.enableBatching(false).topic(topicName).create();
2320+
2321+
List<String> expected = new ArrayList<>();
2322+
for (int i = 0; i < 10; i++) {
2323+
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
2324+
expected.add(String.valueOf(i));
2325+
}
2326+
producer.flush();
2327+
2328+
admin.topics().triggerCompaction(topicName);
2329+
2330+
Awaitility.await().untilAsserted(() -> {
2331+
assertEquals(admin.topics().compactionStatus(topicName).status,
2332+
LongRunningProcessStatus.Status.SUCCESS);
2333+
});
2334+
2335+
// trim the topic
2336+
admin.topics().unload(topicName);
2337+
2338+
Awaitility.await().untilAsserted(() -> {
2339+
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
2340+
assertEquals(internalStats.numberOfEntries, 0);
2341+
});
2342+
2343+
// Make ml.getFirstPosition() return new ledger first position
2344+
producer.newMessage().key("K").value("V").send();
2345+
expected.add("V");
2346+
2347+
@Cleanup
2348+
ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
2349+
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
2350+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
2351+
.isAckReceiptEnabled(true)
2352+
.subscribe();
2353+
2354+
List<String> results = new ArrayList<>();
2355+
while (true) {
2356+
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
2357+
if (message == null) {
2358+
break;
2359+
}
2360+
2361+
results.add(message.getValue());
2362+
consumer.acknowledge(message);
2363+
}
2364+
2365+
Assert.assertEquals(results, expected);
2366+
}
22092367
}

0 commit comments

Comments
 (0)