Skip to content

Commit e891aef

Browse files
poorbarcodesrinath-ctds
authored andcommitted
[fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (apache#24282)
Fixes apache#24262 Main Issue: apache#24262 The issue is a regression of apache#24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages Fix the issue (cherry picked from commit df2c619) (cherry picked from commit a66d6b7)
1 parent cbf4ea4 commit e891aef

File tree

1 file changed

+15
-5
lines changed

1 file changed

+15
-5
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,8 +1528,7 @@ public SchemaStorageFormat.SchemaLocator deserialize(String path, byte[] content
15281528
producer.close();
15291529
}
15301530

1531-
// This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262
1532-
@Test(enabled = false)
1531+
@Test
15331532
public void testPendingQueueSizeIfIncompatible() throws Exception {
15341533
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
15351534
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
@@ -1538,17 +1537,28 @@ public void testPendingQueueSizeIfIncompatible() throws Exception {
15381537
admin.topics().createNonPartitionedTopic(topic);
15391538

15401539
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1541-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1542-
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1540+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
1541+
producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
15431542
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
15441543
for (int i = 0; i < 100; i++) {
1545-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1544+
final String msg = "msg-with-broken-schema-" + i;
1545+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> {
1546+
log.info("send complete {}", msg);
1547+
return null;
1548+
}).exceptionally(ex -> {
1549+
log.error("failed to send {}", msg, ex);
1550+
return null;
1551+
}));
15461552
}
1553+
// Verify: msgs with broken schema will be discarded.
15471554
Awaitility.await().untilAsserted(() -> {
15481555
assertTrue(latestSend.get().isDone());
15491556
assertEquals(producer.getPendingQueueSize(), 0);
15501557
});
15511558

1559+
// Verify: msgs with compatible schema can be sent successfully.
1560+
producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
1561+
15521562
// cleanup.
15531563
producer.close();
15541564
admin.topics().delete(topic, false);

0 commit comments

Comments
 (0)