Skip to content

Commit df2c619

Browse files
authored
[fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (#24282)
Fixes #24262 Main Issue: #24262 ### Motivation The issue is a regression of #24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages ### Modifications Fix the issue
1 parent 63d35d1 commit df2c619

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,8 +1528,7 @@ public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content
15281528
producer.close();
15291529
}
15301530

1531-
// This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262
1532-
@Test(enabled = false)
1531+
@Test
15331532
public void testPendingQueueSizeIfIncompatible() throws Exception {
15341533
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
15351534
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
@@ -1538,17 +1537,28 @@ public void testPendingQueueSizeIfIncompatible() throws Exception {
15381537
admin.topics().createNonPartitionedTopic(topic);
15391538

15401539
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1541-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1542-
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1540+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
1541+
producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
15431542
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
15441543
for (int i = 0; i < 100; i++) {
1545-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1544+
final String msg = "msg-with-broken-schema-" + i;
1545+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> {
1546+
log.info("send complete {}", msg);
1547+
return null;
1548+
}).exceptionally(ex -> {
1549+
log.error("failed to send {}", msg, ex);
1550+
return null;
1551+
}));
15461552
}
1553+
// Verify: msgs with broken schema will be discarded.
15471554
Awaitility.await().untilAsserted(() -> {
15481555
assertTrue(latestSend.get().isDone());
15491556
assertEquals(producer.getPendingQueueSize(), 0);
15501557
});
15511558

1559+
// Verify: msgs with compatible schema can be sent successfully.
1560+
producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
1561+
15521562
// cleanup.
15531563
producer.close();
15541564
admin.topics().delete(topic, false);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,6 +2464,7 @@ protected void updateLastSeqPushed(OpSendMsg op) {
24642464
* 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all following
24652465
* publishing to avoid out-of-order issue.
24662466
* 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages.
2467+
* Additionally, the following messages may need schema registration also.
24672468
* 3-2. The new schema registration failed due to other error, retry registering.
24682469
* Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on
24692470
* {@link ProducerImpl} before calling method.
@@ -2482,6 +2483,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
24822483
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
24832484
MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
24842485
OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
2486+
boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false;
24852487
while (msgIterator.hasNext()) {
24862488
OpSendMsg op = msgIterator.next();
24872489
if (loopStartAt != null) {
@@ -2526,6 +2528,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
25262528
+ " 2) Unload topic on target cluster. Schema details: {}",
25272529
topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
25282530
loopEndDueToSchemaRegisterNeeded = op;
2531+
pausedSendingToPreservePublishOrderOnSchemaRegFailure = true;
25292532
break;
25302533
}
25312534
// Event 3-1-2.
@@ -2581,23 +2584,27 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
25812584
}
25822585
cnx.ctx().flush();
25832586

2584-
// "Event 1-1" or "Event 3-1-1" or "Event 3-2".
2587+
// "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2".
25852588
if (loopEndDueToSchemaRegisterNeeded != null) {
25862589
if (compareAndSetState(State.Connecting, State.Ready)) {
25872590
// "Event 1-1" happens after "Event 3-1-1".
25882591
// After a topic unload, ask the producer retry to register schema, which avoids restart client
25892592
// after users changed the compatibility strategy to make the schema is compatible.
25902593
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25912594
expectedEpoch);
2592-
} else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) {
2593-
// "Event 2-1" or "Event 3-2".
2595+
} else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) {
2596+
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2597+
return;
2598+
} else if (compareAndSetState(State.RegisteringSchema, State.Ready)) {
2599+
// "Event 2-1" or "Event 3-1-2" or "Event 3-2".
25942600
// "pendingMessages" has more messages to register new schema.
25952601
// This operation will not be conflict with another schema registration because both operations are
25962602
// attempt to acquire the same lock "ProducerImpl.this".
25972603
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25982604
expectedEpoch);
25992605
}
2600-
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2606+
// Schema registration will trigger a new "recoverProcessOpSendMsgFrom", so return here. If failed to switch
2607+
// state, it means another task will trigger a new "recoverProcessOpSendMsgFrom".
26012608
return;
26022609
} else if (latestMsgAttemptedRegisteredSchema != null) {
26032610
// Event 2-2 or "Event 3-1-2".

0 commit comments

Comments
 (0)