Skip to content

Commit 1b7e4a7

Browse files
[fix][txn] Fix deadlock when loading transaction buffer snapshot (apache#24401)
1 parent bfee7a6 commit 1b7e4a7

File tree

7 files changed

+57
-10
lines changed

7 files changed

+57
-10
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
301301

302302
private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
303303
private final ExecutorProvider transactionExecutorProvider;
304+
private final ExecutorProvider transactionSnapshotRecoverExecutorProvider;
304305
private final MonotonicClock monotonicClock;
305306
private String brokerId;
306307
private final CompletableFuture<Void> readyForIncomingRequestsFuture = new CompletableFuture<>();
@@ -371,8 +372,11 @@ public PulsarService(ServiceConfiguration config,
371372
if (config.isTransactionCoordinatorEnabled()) {
372373
this.transactionExecutorProvider = new ExecutorProvider(this.getConfiguration()
373374
.getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-executor");
375+
this.transactionSnapshotRecoverExecutorProvider = new ExecutorProvider(this.getConfiguration()
376+
.getNumTransactionReplayThreadPoolSize(), "pulsar-transaction-snapshot-recover");
374377
} else {
375378
this.transactionExecutorProvider = null;
379+
this.transactionSnapshotRecoverExecutorProvider = null;
376380
}
377381

378382
this.ioEventLoopGroup = EventLoopUtil.newEventLoopGroup(config.getNumIOThreads(), config.isEnableBusyWait(),
@@ -661,6 +665,9 @@ public CompletableFuture<Void> closeAsync() {
661665
if (transactionExecutorProvider != null) {
662666
transactionExecutorProvider.shutdownNow();
663667
}
668+
if (transactionSnapshotRecoverExecutorProvider != null) {
669+
transactionSnapshotRecoverExecutorProvider.shutdownNow();
670+
}
664671
if (transactionTimer != null) {
665672
transactionTimer.stop();
666673
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public boolean checkAbortedTransaction(TxnID txnID) {
8787
public CompletableFuture<Position> recoverFromSnapshot() {
8888
final var future = new CompletableFuture<Position>();
8989
final var pulsar = topic.getBrokerService().getPulsar();
90-
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
90+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(() -> {
9191
try {
9292
final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
9393
.getTableView().readLatest(topic.getName());

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ public CompletableFuture<Void> takeAbortedTxnsSnapshot(Position maxReadPosition)
228228
public CompletableFuture<Position> recoverFromSnapshot() {
229229
final var pulsar = topic.getBrokerService().getPulsar();
230230
final var future = new CompletableFuture<Position>();
231-
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
231+
pulsar.getTransactionSnapshotRecoverExecutorProvider().getExecutor(this).execute(() -> {
232232
try {
233233
final var indexes = pulsar.getTransactionBufferSnapshotServiceFactory()
234234
.getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ public TableView(Function<TopicName, CompletableFuture<Reader<T>>> readerCreator
5858
}
5959

6060
public T readLatest(String topic) throws Exception {
61+
try {
62+
return internalReadLatest(topic);
63+
} catch (Exception e) {
64+
final var namespace = TopicName.get(topic).getNamespaceObject();
65+
readers.remove(namespace);
66+
throw e;
67+
}
68+
}
69+
70+
private T internalReadLatest(String topic) throws Exception {
6171
final var reader = getReader(topic);
6272
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
6373
final var msg = wait(reader.readNextAsync(), "read message");

pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private class ExpirableValue<V> {
4141

4242
boolean tryExpire() {
4343
if (System.currentTimeMillis() >= deadlineMs) {
44-
expireCallback.accept(value);
44+
cancel();
4545
return true;
4646
} else {
4747
return false;
@@ -51,6 +51,10 @@ boolean tryExpire() {
5151
void updateDeadline() {
5252
deadlineMs = System.currentTimeMillis() + timeoutMs;
5353
}
54+
55+
void cancel() {
56+
expireCallback.accept(value);
57+
}
5458
}
5559

5660
public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) {
@@ -80,4 +84,14 @@ public synchronized V get(final K key, final Supplier<V> valueSupplier, final Co
8084
cache.put(key, newValue);
8185
return newValue.value;
8286
}
87+
88+
public void remove(final K key) {
89+
final ExpirableValue<V> value;
90+
synchronized (this) {
91+
value = cache.remove(key);
92+
}
93+
if (value != null) {
94+
value.cancel();
95+
}
96+
}
8397
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ public class TransactionTest extends TransactionTestBase {
178178

179179
@BeforeClass
180180
protected void setup() throws Exception {
181-
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
181+
// Use a single transaction thread to reproduce possible deadlock easily
182+
conf.setNumTransactionReplayThreadPoolSize(1);
183+
conf.setManagedLedgerNumSchedulerThreads(1);
184+
setUpBase(NUM_BROKERS, NUM_PARTITIONS, NAMESPACE1 + "/test", 0);
182185
}
183186

184187
@AfterClass(alwaysRun = true)

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
241241
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
242242
private final AtomicInteger previousExceptionCount = new AtomicInteger();
243243
private volatile boolean hasSoughtByTimestamp = false;
244+
// This field will be set after the state becomes Failed, then the following operations will fail immediately
245+
private volatile Throwable failReason = null;
244246

245247
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
246248
String topic,
@@ -980,11 +982,13 @@ public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
980982
} else if (!subscribeFuture.isDone()) {
981983
// unable to create new consumer, fail operation
982984
setState(State.Failed);
985+
final Throwable throwable = PulsarClientException.wrap(e, String.format("Failed to subscribe the "
986+
+ "topic %s with subscription name %s when connecting to the broker", topicName.toString(),
987+
subscription));
988+
fail(throwable);
989+
983990
closeConsumerTasks();
984-
subscribeFuture.completeExceptionally(
985-
PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s "
986-
+ "with subscription name %s when connecting to the broker",
987-
topicName.toString(), subscription)));
991+
subscribeFuture.completeExceptionally(throwable);
988992
client.cleanupConsumer(this);
989993
} else if (isUnrecoverableError(e.getCause())) {
990994
closeWhenReceivedUnrecoverableError(e.getCause(), cnx);
@@ -1025,7 +1029,7 @@ protected void closeWhenReceivedUnrecoverableError(Throwable t, ClientCnx cnx) {
10251029
topic, subscription, cnxStr, t.getClass().getName(), t.getMessage());
10261030
closeAsync().whenComplete((__, ex) -> {
10271031
if (ex == null) {
1028-
setState(State.Failed);
1032+
fail(t);
10291033
return;
10301034
}
10311035
log.error("[{}][{}] {} Failed to close consumer after got an error that does not support to retry: {} {}",
@@ -1119,7 +1123,7 @@ public boolean connectionFailed(PulsarClientException exception) {
11191123
if (nonRetriableError || timeout) {
11201124
exception.setPreviousExceptionCount(previousExceptionCount);
11211125
if (subscribeFuture.completeExceptionally(exception)) {
1122-
setState(State.Failed);
1126+
fail(exception);
11231127
if (nonRetriableError) {
11241128
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}",
11251129
topic, consumerId, exception.getMessage());
@@ -2870,6 +2874,10 @@ private void internalGetLastMessageIdAsync(final Backoff backoff,
28702874
return null;
28712875
});
28722876
} else {
2877+
if (failReason != null) {
2878+
future.completeExceptionally(failReason);
2879+
return;
2880+
}
28732881
long nextDelay = Math.min(backoff.next(), remainingTime.get());
28742882
if (nextDelay <= 0) {
28752883
future.completeExceptionally(
@@ -3301,4 +3309,9 @@ public Producer<byte[]> getRetryLetterProducer() {
33013309
public Producer<byte[]> getDeadLetterProducer() throws ExecutionException, InterruptedException {
33023310
return (deadLetterProducer == null || !deadLetterProducer.isDone()) ? null : deadLetterProducer.get();
33033311
}
3312+
3313+
private void fail(Throwable throwable) {
3314+
setState(State.Failed);
3315+
failReason = throwable;
3316+
}
33043317
}

0 commit comments

Comments
 (0)