Skip to content

Commit ad057cd

Browse files
poorbarcodeganesh-ctds
authored andcommitted
[fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (apache#24282)
Fixes apache#24262 Main Issue: apache#24262 The issue is a regression of apache#24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages Fix the issue (cherry picked from commit df2c619) (cherry picked from commit 5496d92)
1 parent 0147c15 commit ad057cd

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,8 +1450,7 @@ public User(String name) {
14501450
}
14511451

14521452

1453-
// This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262
1454-
@Test(enabled = false)
1453+
@Test
14551454
public void testPendingQueueSizeIfIncompatible() throws Exception {
14561455
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
14571456
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
@@ -1460,17 +1459,28 @@ public void testPendingQueueSizeIfIncompatible() throws Exception {
14601459
admin.topics().createNonPartitionedTopic(topic);
14611460

14621461
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1463-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1464-
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1462+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
1463+
producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
14651464
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
14661465
for (int i = 0; i < 100; i++) {
1467-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1466+
final String msg = "msg-with-broken-schema-" + i;
1467+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> {
1468+
log.info("send complete {}", msg);
1469+
return null;
1470+
}).exceptionally(ex -> {
1471+
log.error("failed to send {}", msg, ex);
1472+
return null;
1473+
}));
14681474
}
1475+
// Verify: msgs with broken schema will be discarded.
14691476
Awaitility.await().untilAsserted(() -> {
14701477
assertTrue(latestSend.get().isDone());
14711478
assertEquals(producer.getPendingQueueSize(), 0);
14721479
});
14731480

1481+
// Verify: msgs with compatible schema can be sent successfully.
1482+
producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
1483+
14741484
// cleanup.
14751485
producer.close();
14761486
admin.topics().delete(topic, false);

0 commit comments

Comments
 (0)