Skip to content

Commit 6856a15

Browse files
poorbarcodelhotari
authored andcommitted
[fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (#24282)
Fixes #24262 Main Issue: #24262 ### Motivation The issue is a regression of #24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages ### Modifications Fix the issue (cherry picked from commit df2c619)
1 parent bed24c4 commit 6856a15

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1526,8 +1526,7 @@ public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content
15261526
producer.close();
15271527
}
15281528

1529-
// This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262
1530-
@Test(enabled = false)
1529+
@Test
15311530
public void testPendingQueueSizeIfIncompatible() throws Exception {
15321531
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
15331532
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
@@ -1536,17 +1535,28 @@ public void testPendingQueueSizeIfIncompatible() throws Exception {
15361535
admin.topics().createNonPartitionedTopic(topic);
15371536

15381537
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1539-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1540-
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1538+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
1539+
producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
15411540
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
15421541
for (int i = 0; i < 100; i++) {
1543-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1542+
final String msg = "msg-with-broken-schema-" + i;
1543+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> {
1544+
log.info("send complete {}", msg);
1545+
return null;
1546+
}).exceptionally(ex -> {
1547+
log.error("failed to send {}", msg, ex);
1548+
return null;
1549+
}));
15441550
}
1551+
// Verify: msgs with broken schema will be discarded.
15451552
Awaitility.await().untilAsserted(() -> {
15461553
assertTrue(latestSend.get().isDone());
15471554
assertEquals(producer.getPendingQueueSize(), 0);
15481555
});
15491556

1557+
// Verify: msgs with compatible schema can be sent successfully.
1558+
producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
1559+
15501560
// cleanup.
15511561
producer.close();
15521562
admin.topics().delete(topic, false);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2447,6 +2447,7 @@ protected void updateLastSeqPushed(OpSendMsg op) {
24472447
* 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all following
24482448
* publishing to avoid out-of-order issue.
24492449
* 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages.
2450+
* Additionally, the following messages may need schema registration also.
24502451
* 3-2. The new schema registration failed due to other error, retry registering.
24512452
* Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on
24522453
* {@link ProducerImpl} before calling method.
@@ -2465,6 +2466,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
24652466
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
24662467
MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
24672468
OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
2469+
boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false;
24682470
while (msgIterator.hasNext()) {
24692471
OpSendMsg op = msgIterator.next();
24702472
if (loopStartAt != null) {
@@ -2509,6 +2511,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
25092511
+ " 2) Unload topic on target cluster. Schema details: {}",
25102512
topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
25112513
loopEndDueToSchemaRegisterNeeded = op;
2514+
pausedSendingToPreservePublishOrderOnSchemaRegFailure = true;
25122515
break;
25132516
}
25142517
// Event 3-1-2.
@@ -2564,23 +2567,27 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
25642567
}
25652568
cnx.ctx().flush();
25662569

2567-
// "Event 1-1" or "Event 3-1-1" or "Event 3-2".
2570+
// "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2".
25682571
if (loopEndDueToSchemaRegisterNeeded != null) {
25692572
if (compareAndSetState(State.Connecting, State.Ready)) {
25702573
// "Event 1-1" happens after "Event 3-1-1".
25712574
// After a topic unload, ask the producer retry to register schema, which avoids restart client
25722575
// after users changed the compatibility strategy to make the schema is compatible.
25732576
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25742577
expectedEpoch);
2575-
} else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) {
2576-
// "Event 2-1" or "Event 3-2".
2578+
} else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) {
2579+
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2580+
return;
2581+
} else if (compareAndSetState(State.RegisteringSchema, State.Ready)) {
2582+
// "Event 2-1" or "Event 3-1-2" or "Event 3-2".
25772583
// "pendingMessages" has more messages to register new schema.
25782584
// This operation will not be conflict with another schema registration because both operations are
25792585
// attempt to acquire the same lock "ProducerImpl.this".
25802586
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25812587
expectedEpoch);
25822588
}
2583-
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2589+
// Schema registration will trigger a new "recoverProcessOpSendMsgFrom", so return here. If failed to switch
2590+
// state, it means another task will trigger a new "recoverProcessOpSendMsgFrom".
25842591
return;
25852592
} else if (latestMsgAttemptedRegisteredSchema != null) {
25862593
// Event 2-2 or "Event 3-1-2".

0 commit comments

Comments
 (0)