Skip to content

Commit 27e27d3

Browse files
BewareMyPowerlhotari
authored andcommitted
[fix][txn] Fix deadlock when loading transaction buffer snapshot (apache#24401)
(cherry picked from commit 1b7e4a7)
1 parent 88c4482 commit 27e27d3

File tree

7 files changed

+57
-10
lines changed

7 files changed

+57
-10
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
275275

276276
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
277277
private final ExecutorProvider transactionExecutorProvider;
278+
private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
278279
private String brokerId;
279280
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
280281
private final List<Runnable> pendingTasksBeforeReadyForIncomingRequests = new ArrayList<>();
@@ -337,8 +338,11 @@ public PulsarService(ServiceConfiguration config,
337338
if (config.isTransactionCoordinatorEnabled()) {
338339
this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration()
339340
.getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-executor");
341+
this.transactionSnapshotRecoverExecutorProvider = new ExecutorProvider(this.getConfiguration()
342+
.getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-snapshot-recover");
340343
} else {
341344
this.transactionExecutorProvider = null;
345+
this.transactionSnapshotRecoverExecutorProvider = null;
342346
}
343347

344348
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
@@ -600,6 +604,9 @@ public CompletableFuture<Void> closeAsync() {
600604
if (transactionExecutorProvider != null) {
601605
transactionExecutorProvider.shutdownNow();
602606
}
607+
if (transactionSnapshotRecoverExecutorProvider != null) {
608+
transactionSnapshotRecoverExecutorProvider.shutdownNow();
609+
}
603610
if (transactionTimer != null) {
604611
transactionTimer.stop();
605612
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public boolean checkAbortedTransaction(TxnID txnID) {
8686
public CompletableFuture<PositionImpl> recoverFromSnapshot() {
8787
final var future = new CompletableFuture<PositionImpl>();
8888
final var pulsar = topic.getBrokerService().getPulsar();
89-
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
89+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(() -> {
9090
try {
9191
final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
9292
.getTableView().readLatest(topic.getName());

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosit
223223
public CompletableFuture<PositionImpl> recoverFromSnapshot() {
224224
final var pulsar = topic.getBrokerService().getPulsar();
225225
final var future = new CompletableFuture<PositionImpl>();
226-
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
226+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(() -> {
227227
try {
228228
final var indexes = pulsar.getTransactionBufferSnapshotServiceFactory()
229229
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ public TableView(Function<TopicName, CompletableFuture<Reader<T>>> readerCreator
5858
}
5959

6060
public T readLatest(String topic) throws Exception {
61+
try {
62+
return internalReadLatest(topic);
63+
} catch (Exception e) {
64+
final var namespace = TopicName.get(topic).getNamespaceObject();
65+
readers.remove(namespace);
66+
throw e;
67+
}
68+
}
69+
70+
private T internalReadLatest(String topic) throws Exception {
6171
final var reader = getReader(topic);
6272
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
6373
final var msg = wait(reader.readNextAsync(), "read message");

pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private class ExpirableValue<V> {
4141

4242
boolean tryExpire() {
4343
if (System.currentTimeMillis() >= deadlineMs) {
44-
expireCallback.accept(value);
44+
cancel();
4545
return true;
4646
} else {
4747
return false;
@@ -51,6 +51,10 @@ boolean tryExpire() {
5151
void updateDeadline() {
5252
deadlineMs = System.currentTimeMillis() + timeoutMs;
5353
}
54+
55+
void cancel() {
56+
expireCallback.accept(value);
57+
}
5458
}
5559

5660
public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) {
@@ -80,4 +84,14 @@ public synchronized V get(final K key, final Supplier<V> valueSupplier, final Co
8084
cache.put(key, newValue);
8185
return newValue.value;
8286
}
87+
88+
public void remove(final K key) {
89+
final ExpirableValue<V> value;
90+
synchronized (this) {
91+
value = cache.remove(key);
92+
}
93+
if (value != null) {
94+
value.cancel();
95+
}
96+
}
8397
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,10 @@ public class TransactionTest extends TransactionTestBase {
169169

170170
@BeforeClass
171171
protected void setup() throws Exception {
172-
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
172+
// Use a single transaction thread to reproduce possible deadlock easily
173+
conf.setNumTransactionReplayThreadPoolSize(1);
174+
conf.setManagedLedgerNumSchedulerThreads(1);
175+
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
173176
}
174177

175178
@AfterClass(alwaysRun = true)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
222222
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
223223
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
224224
private volatile boolean hasSoughtByTimestamp = false;
225+
// This field will be set after the state becomes Failed, then the following operations will fail immediately
226+
private volatile Throwable failReason = null;
225227

226228
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
227229
String topic,
@@ -921,11 +923,13 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
921923
} else if (!subscribeFuture.isDone()) {
922924
// unable to create new consumer, fail operation
923925
setState(State.Failed);
926+
final Throwable throwable = PulsarClientException.wrap(e, String.format("Failed to subscribe the "
927+
+ "topic %s with subscription name %s when connecting to the broker", topicName.toString(),
928+
subscription));
929+
fail(throwable);
930+
924931
closeConsumerTasks();
925-
subscribeFuture.completeExceptionally(
926-
PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s "
927-
+ "with subscription name %s when connecting to the broker",
928-
topicName.toString(), subscription)));
932+
subscribeFuture.completeExceptionally(throwable);
929933
client.cleanupConsumer(this);
930934
} else if (isUnrecoverableError(e.getCause())) {
931935
closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
@@ -966,7 +970,7 @@ protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx cnx) {
966970
topic, subscription, cnxStr, t.getClass().getName(), t.getMessage());
967971
closeAsync().whenComplete((__, ex) -> {
968972
if (ex == null) {
969-
setState(State.Failed);
973+
fail(t);
970974
return;
971975
}
972976
log.error("[{}][{}] {} Failed to close consumer after got an error that does not support to retry: {} {}",
@@ -1060,7 +1064,7 @@ public boolean connectionFailed(PulsarClientException exception) {
10601064
if (nonRetriableError || timeout) {
10611065
exception.setPreviousExceptions(previousExceptions);
10621066
if (subscribeFuture.completeExceptionally(exception)) {
1063-
setState(State.Failed);
1067+
fail(exception);
10641068
if (nonRetriableError) {
10651069
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}",
10661070
topic, consumerId, exception.getMessage());
@@ -2720,6 +2724,10 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
27202724
return null;
27212725
});
27222726
} else {
2727+
if (failReason != null) {
2728+
future.completeExceptionally(failReason);
2729+
return;
2730+
}
27232731
long nextDelay = Math.min(backoff.next(), remainingTime.get());
27242732
if (nextDelay <= 0) {
27252733
future.completeExceptionally(
@@ -3135,4 +3143,9 @@ enum SeekStatus {
31353143
IN_PROGRESS,
31363144
COMPLETED
31373145
}
3146+
3147+
private void fail(Throwable throwable) {
3148+
setState(State.Failed);
3149+
failReason = throwable;
3150+
}
31383151
}

0 commit comments

Comments
 (0)