You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Attempt to use highWatermark on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.
Given that the highWatermark is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.
import{KafkaJSasConfluent,RdKafka}from"@confluentinc/kafka-javascript";import{Admin,Consumer,Kafka,Producer}from"kafkajs";constKAFKA_JS_TOPIC=`test-confluent-topic-${Date.now()}`;constKAFKA_JS_GROUP_ID=`test-confluent-group-${Date.now()}`;constCONFLUENT_TOPIC=`test-confluent-topic-${Date.now()}`;constCONFLUENT_GROUP_ID=`test-confluent-group-${Date.now()}`;describe("partitionLag",()=>{letkafkaJSKafka: Kafka;letkafkaJSAdmin: Admin;letkafkaJSConsumer: Consumer;letkafkaJSProducer: Producer;letconfluentKafka: Confluent.Kafka;letconfluentAdmin: Confluent.Admin;letconfluentConsumer: Confluent.Consumer;letconfluentProducer: Confluent.Producer;before(async()=>{kafkaJSKafka=newKafka({brokers: ["localhost:9092"]});kafkaJSAdmin=kafkaJSKafka.admin();awaitkafkaJSAdmin.connect();confluentKafka=newConfluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});confluentAdmin=confluentKafka.admin();awaitconfluentAdmin.connect();});beforeEach(async()=>{awaitkafkaJSAdmin.createTopics({topics: [{topic: KAFKA_JS_TOPIC}]});kafkaJSProducer=kafkaJSKafka.producer();awaitkafkaJSProducer.connect();awaitconfluentAdmin.createTopics({topics: [{topic: CONFLUENT_TOPIC}]});confluentProducer=confluentKafka.producer();awaitconfluentProducer.connect();});afterEach(async()=>{awaitkafkaJSProducer.disconnect();awaitkafkaJSConsumer?.disconnect();awaitconfluentProducer.disconnect();awaitconfluentConsumer?.disconnect();});after(async()=>{awaitconfluentAdmin.disconnect();awaitconfluentProducer.disconnect();awaitconfluentConsumer?.disconnect();awaitkafkaJSAdmin.disconnect();awaitkafkaJSProducer.disconnect();awaitkafkaJSConsumer?.disconnect();});it("reports lag with KafkaJS",async()=>{for(leti=0;i<10;i++){awaitkafkaJSProducer.send({topic: CONFLUENT_TOPIC,messages: [{value: "one"}]});}letready=false;letreceivedMessages: number=0;kafkaJSConsumer=kafkaJSKafka.consumer({groupId: KAFKA_JS_GROUP_ID});kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN,(event: any)=>{ready=true;});awaitkafkaJSConsumer.connect();awaitkafkaJSConsumer.subscribe({topic: KAFKA_JS_TOPIC,fromBeginning: true});awaitkafkaJSConsumer.run({eachBatch: async({batch})=>{console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);consthighWatermark=parseInt(batch.highWatermark);for(constmessageofbatch.messages){constoffset=parseInt(message.offset);console.log(` Processing offset ${message.offset} which has a lag of ${highWatermark-offset}`);receivedMessages++;}}});awaituntil(()=>ready);for(leti=0;i<10;i++){awaitkafkaJSProducer.send({topic: CONFLUENT_TOPIC,messages: [{value: "one"}]});}awaituntil(()=>receivedMessages==20);});it("reports lag with Confluent",async()=>{for(leti=0;i<10;i++){awaitconfluentProducer.send({topic: CONFLUENT_TOPIC,messages: [{value: "one"}]});}letready=false;letreceivedMessages: number=0;confluentConsumer=confluentKafka.consumer({kafkaJS: {groupId: CONFLUENT_GROUP_ID,fromBeginning: true},rebalance_cb: (err: any,assignment: any,consumer: any)=>{if(err.code!==RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS)return;if(!ready){ready=true;}}});awaitconfluentConsumer.connect();awaitconfluentConsumer.subscribe({topic: CONFLUENT_TOPIC});awaitconfluentConsumer.run({eachBatch: async({batch})=>{console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);consthighWatermark=parseInt(batch.highWatermark);for(constmessageofbatch.messages){constoffset=parseInt(message.offset);console.log(` Processing offset ${message.offset} which has a lag of ${highWatermark-offset}`);receivedMessages++;}}});awaituntil(()=>ready);for(leti=0;i<10;i++){awaitkafkaJSProducer.send({topic: CONFLUENT_TOPIC,messages: [{value: "one"}]});}awaituntil(()=>receivedMessages==20);});asyncfunctionuntil(condition: ()=>boolean){consttimeout=10000;constfinish=Date.now()+timeout;while(Date.now()<=finish){constresult=condition();if(result)return;awaitnewPromise(resolve=>setTimeout(resolve,50));}thrownewError(`Failed within ${timeout!}ms`);}});
KafkaJS output is:
Received batch with 13 messages with highWatermark 13 on partition 0
Processing offset 0 which has a lag of 13
Processing offset 1 which has a lag of 12
Processing offset 2 which has a lag of 11
Processing offset 3 which has a lag of 10
Processing offset 4 which has a lag of 9
Processing offset 5 which has a lag of 8
Processing offset 6 which has a lag of 7
Processing offset 7 which has a lag of 6
Processing offset 8 which has a lag of 5
Processing offset 9 which has a lag of 4
Processing offset 10 which has a lag of 3
Processing offset 11 which has a lag of 2
Processing offset 12 which has a lag of 1
Received batch with 4 messages with highWatermark 17 on partition 0
Processing offset 13 which has a lag of 4
Processing offset 14 which has a lag of 3
Processing offset 15 which has a lag of 2
Processing offset 16 which has a lag of 1
Received batch with 3 messages with highWatermark 20 on partition 0
Processing offset 17 which has a lag of 3
Processing offset 18 which has a lag of 2
Processing offset 19 which has a lag of 1
Confluent output is:
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 20 which has a lag of -1021
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 21 which has a lag of -1022
Received batch with 2 messages with highWatermark -1001 on partition 0
Processing offset 22 which has a lag of -1023
Processing offset 23 which has a lag of -1024
Received batch with 2 messages with highWatermark -1001 on partition 0
Processing offset 24 which has a lag of -1025
Processing offset 25 which has a lag of -1026
Received batch with 4 messages with highWatermark -1001 on partition 0
Processing offset 26 which has a lag of -1027
Processing offset 27 which has a lag of -1028
Processing offset 28 which has a lag of -1029
Processing offset 29 which has a lag of -1030
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 30 which has a lag of -1031
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 31 which has a lag of -1032
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 32 which has a lag of -1033
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 33 which has a lag of -1034
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 34 which has a lag of -1035
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 35 which has a lag of -1036
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 36 which has a lag of -1037
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 37 which has a lag of -1038
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 38 which has a lag of -1039
Received batch with 1 messages with highWatermark -1001 on partition 0
Processing offset 39 which has a lag of -1040
The text was updated successfully, but these errors were encountered:
Environment Information
Steps to Reproduce
Attempt to use
highWatermark
on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.Given that the
highWatermark
is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.KafkaJS output is:
Confluent output is:
The text was updated successfully, but these errors were encountered: