Skip to content

Commit 4c98ba5

Browse files
poorbarcodenodece
authored andcommitted
[fix] [broker] Prevent long deduplication cursor backlog so that topic loading wouldn't timeout (apache#22479)
(cherry picked from commit 837f8bc)
1 parent 3cb268c commit 4c98ba5

File tree

4 files changed

+181
-8
lines changed

4 files changed

+181
-8
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,10 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd
547547
}
548548

549549
protected void startDeduplicationSnapshotMonitor() {
550+
// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
551+
// scheduled task runs.
550552
int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds();
551-
if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) {
553+
if (interval > 0) {
552554
this.deduplicationSnapshotMonitor =
553555
Executors.newSingleThreadScheduledExecutor(new ExecutorProvider.ExtendedThreadFactory(
554556
"deduplication-snapshot-monitor"));

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -153,9 +153,14 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {
153153

154154
// Replay all the entries and apply all the sequence ids updates
155155
log.info("[{}] Replaying {} entries for deduplication", topic.getName(), managedCursor.getNumberOfEntries());
156-
CompletableFuture<Void> future = new CompletableFuture<>();
156+
CompletableFuture<Position> future = new CompletableFuture<>();
157157
replayCursor(future);
158-
return future;
158+
return future.thenAccept(lastPosition -> {
159+
if (lastPosition != null && snapshotCounter >= snapshotInterval) {
160+
snapshotCounter = 0;
161+
takeSnapshot(lastPosition);
162+
}
163+
});
159164
}
160165

161166
/**
@@ -164,11 +169,11 @@ private CompletableFuture<Void> recoverSequenceIdsMap() {
164169
*
165170
* @param future future to trigger when the replay is complete
166171
*/
167-
private void replayCursor(CompletableFuture<Void> future) {
172+
private void replayCursor(CompletableFuture<Position> future) {
168173
managedCursor.asyncReadEntries(100, new ReadEntriesCallback() {
169174
@Override
170175
public void readEntriesComplete(List<Entry> entries, Object ctx) {
171-
176+
Position lastPosition = null;
172177
for (Entry entry : entries) {
173178
ByteBuf messageMetadataAndPayload = entry.getDataBuffer();
174179
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);
@@ -178,7 +183,8 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
178183
highestSequencedPushed.put(producerName, sequenceId);
179184
highestSequencedPersisted.put(producerName, sequenceId);
180185
producerRemoved(producerName);
181-
186+
snapshotCounter++;
187+
lastPosition = entry.getPosition();
182188
entry.release();
183189
}
184190

@@ -187,7 +193,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
187193
pulsar.getExecutor().execute(() -> replayCursor(future));
188194
} else {
189195
// Done replaying
190-
future.complete(null);
196+
future.complete(lastPosition);
191197
}
192198
}
193199

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
178178

179179
private final ConcurrentOpenHashMap<String, Replicator> replicators;
180180

181-
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
181+
public static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
182182
private static final String TOPIC_EPOCH_PROPERTY_NAME = "pulsar.topic.epoch";
183183

184184
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertTrue;
24+
import java.time.Duration;
25+
import java.util.concurrent.CompletableFuture;
26+
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
27+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
28+
import org.apache.bookkeeper.mledger.impl.PositionImpl;
29+
import org.apache.pulsar.broker.BrokerTestUtil;
30+
import org.apache.pulsar.broker.service.persistent.MessageDeduplication;
31+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
32+
import org.apache.pulsar.client.api.Producer;
33+
import org.apache.pulsar.client.api.ProducerConsumerBase;
34+
import org.apache.pulsar.client.api.Schema;
35+
import org.awaitility.Awaitility;
36+
import org.awaitility.reflect.WhiteboxImpl;
37+
import org.testng.annotations.AfterClass;
38+
import org.testng.annotations.BeforeClass;
39+
import org.testng.annotations.Test;
40+
41+
public class DeduplicationDisabledBrokerLevelTest extends ProducerConsumerBase {
42+
43+
private int deduplicationSnapshotFrequency = 5;
44+
private int brokerDeduplicationEntriesInterval = 1000;
45+
46+
@BeforeClass
47+
@Override
48+
protected void setup() throws Exception {
49+
super.internalSetup();
50+
producerBaseSetup();
51+
}
52+
53+
@AfterClass(alwaysRun = true)
54+
@Override
55+
protected void cleanup() throws Exception {
56+
super.internalCleanup();
57+
}
58+
59+
protected void doInitConf() throws Exception {
60+
this.conf.setSystemTopicEnabled(true);
61+
this.conf.setTopicLevelPoliciesEnabled(true);
62+
this.conf.setBrokerDeduplicationEnabled(false);
63+
this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(deduplicationSnapshotFrequency);
64+
this.conf.setBrokerDeduplicationEntriesInterval(brokerDeduplicationEntriesInterval);
65+
}
66+
67+
@Test
68+
public void testNoBacklogOnDeduplication() throws Exception {
69+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
70+
admin.topics().createNonPartitionedTopic(topic);
71+
final PersistentTopic persistentTopic =
72+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
73+
final ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
74+
// deduplication enabled:
75+
// broker level: "false"
76+
// topic level: "true".
77+
// So it is enabled.
78+
admin.topicPolicies().setDeduplicationStatus(topic, true);
79+
Awaitility.await().untilAsserted(() -> {
80+
ManagedCursorImpl cursor =
81+
(ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
82+
assertNotNull(cursor);
83+
});
84+
85+
// Verify: regarding deduplication cursor, messages will be acknowledged automatically.
86+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
87+
producer.send("1");
88+
producer.send("2");
89+
producer.send("3");
90+
producer.close();
91+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
92+
Awaitility.await().atMost(Duration.ofSeconds(deduplicationSnapshotFrequency * 3)).untilAsserted(() -> {
93+
PositionImpl LAC = (PositionImpl) ml.getLastConfirmedEntry();
94+
PositionImpl cursorMD = (PositionImpl) cursor.getMarkDeletedPosition();
95+
assertTrue(LAC.compareTo(cursorMD) <= 0);
96+
});
97+
98+
// cleanup.
99+
admin.topics().delete(topic);
100+
}
101+
102+
@Test
103+
public void testSnapshotCounterAfterUnload() throws Exception {
104+
final int originalDeduplicationSnapshotFrequency = deduplicationSnapshotFrequency;
105+
deduplicationSnapshotFrequency = 3600;
106+
cleanup();
107+
setup();
108+
109+
// Create a topic and wait deduplication is started.
110+
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
111+
admin.topics().createNonPartitionedTopic(topic);
112+
final PersistentTopic persistentTopic1 =
113+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
114+
final ManagedLedgerImpl ml1 = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
115+
admin.topicPolicies().setDeduplicationStatus(topic, true);
116+
Awaitility.await().untilAsserted(() -> {
117+
ManagedCursorImpl cursor1 =
118+
(ManagedCursorImpl) ml1.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
119+
assertNotNull(cursor1);
120+
});
121+
final MessageDeduplication deduplication1 = persistentTopic1.getMessageDeduplication();
122+
123+
// 1. Send 999 messages, it is less than "brokerDeduplicationEntriesIntervaddl".
124+
// 2. Unload topic.
125+
// 3. Send 1 messages, there are 1099 messages have not been snapshot now.
126+
// 4. Verify the snapshot has been taken.
127+
// step 1.
128+
final Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
129+
for (int i = 0; i < brokerDeduplicationEntriesInterval - 1; i++) {
130+
producer.send(i + "");
131+
}
132+
int snapshotCounter1 = WhiteboxImpl.getInternalState(deduplication1, "snapshotCounter");
133+
assertEquals(snapshotCounter1, brokerDeduplicationEntriesInterval - 1);
134+
admin.topics().unload(topic);
135+
PersistentTopic persistentTopic2 =
136+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
137+
ManagedLedgerImpl ml2 = (ManagedLedgerImpl) persistentTopic2.getManagedLedger();
138+
MessageDeduplication deduplication2 = persistentTopic2.getMessageDeduplication();
139+
admin.topicPolicies().setDeduplicationStatus(topic, true);
140+
Awaitility.await().untilAsserted(() -> {
141+
ManagedCursorImpl cursor =
142+
(ManagedCursorImpl) ml2.getCursors().get(PersistentTopic.DEDUPLICATION_CURSOR_NAME);
143+
assertNotNull(cursor);
144+
});
145+
// step 3.
146+
producer.send("last message");
147+
ml2.trimConsumedLedgersInBackground(new CompletableFuture<>());
148+
// step 4.
149+
Awaitility.await().untilAsserted(() -> {
150+
int snapshotCounter3 = WhiteboxImpl.getInternalState(deduplication2, "snapshotCounter");
151+
assertTrue(snapshotCounter3 < brokerDeduplicationEntriesInterval);
152+
// Since https://github.com/apache/pulsar/pull/22034 has not been cherry-pick into branch-3.0, there
153+
// should be 2 ledgers.
154+
// Verify: the previous ledger will be removed because all messages have been acked.
155+
assertEquals(ml2.getLedgersInfo().size(), 2);
156+
});
157+
158+
// cleanup.
159+
producer.close();
160+
admin.topics().delete(topic);
161+
deduplicationSnapshotFrequency = originalDeduplicationSnapshotFrequency;
162+
cleanup();
163+
setup();
164+
}
165+
}

0 commit comments

Comments
 (0)