Skip to content

batch highWatermark always returns -1001 #282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
apeloquin-agilysys opened this issue Apr 6, 2025 · 0 comments · May be fixed by #317
Open

batch highWatermark always returns -1001 #282

apeloquin-agilysys opened this issue Apr 6, 2025 · 0 comments · May be fixed by #317
Labels
enhancement New feature or request

Comments

@apeloquin-agilysys
Copy link

apeloquin-agilysys commented Apr 6, 2025

Environment Information

  • OS: Mac M3 Sonoma 14.6.1:
  • Node Version: 22.12.0
  • NPM Version: 10.9.0:
  • confluent-kafka-javascript version: 1.2.0

Steps to Reproduce

Attempt to use highWatermark on EachBatchPayload. Value always returned is "-1001", which prevents us from tracking partition lag.

Given that the highWatermark is part of the type definition and no mention is made of it in the migration guide, I would expect this to provide an accurate value. Partition lag is a very useful application metric.

import {KafkaJS as Confluent, RdKafka} from "@confluentinc/kafka-javascript";
import {Admin, Consumer, Kafka, Producer} from "kafkajs";

const KAFKA_JS_TOPIC = `test-confluent-topic-${Date.now()}`;
const KAFKA_JS_GROUP_ID = `test-confluent-group-${Date.now()}`;

const CONFLUENT_TOPIC = `test-confluent-topic-${Date.now()}`;
const CONFLUENT_GROUP_ID = `test-confluent-group-${Date.now()}`;

describe("partitionLag", () => {
  let kafkaJSKafka: Kafka;
  let kafkaJSAdmin: Admin;
  let kafkaJSConsumer: Consumer;
  let kafkaJSProducer: Producer;

  let confluentKafka: Confluent.Kafka;
  let confluentAdmin: Confluent.Admin;
  let confluentConsumer: Confluent.Consumer;
  let confluentProducer: Confluent.Producer;

  before(async () => {
    kafkaJSKafka = new Kafka({brokers: ["localhost:9092"]});
    kafkaJSAdmin = kafkaJSKafka.admin();
    await kafkaJSAdmin.connect();

    confluentKafka = new Confluent.Kafka({kafkaJS: {brokers: ["localhost:9092"]}});
    confluentAdmin = confluentKafka.admin();
    await confluentAdmin.connect();
  });

  beforeEach(async () => {
    await kafkaJSAdmin.createTopics({topics: [{topic: KAFKA_JS_TOPIC}]});
    kafkaJSProducer = kafkaJSKafka.producer();
    await kafkaJSProducer.connect();

    await confluentAdmin.createTopics({topics: [{topic: CONFLUENT_TOPIC}]});
    confluentProducer = confluentKafka.producer();
    await confluentProducer.connect();
  });

  afterEach(async () => {
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();

    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();
  });

  after(async () => {
    await confluentAdmin.disconnect();
    await confluentProducer.disconnect();
    await confluentConsumer?.disconnect();

    await kafkaJSAdmin.disconnect();
    await kafkaJSProducer.disconnect();
    await kafkaJSConsumer?.disconnect();
  });

  it("reports lag with KafkaJS", async () => {
    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    kafkaJSConsumer = kafkaJSKafka.consumer({groupId: KAFKA_JS_GROUP_ID});
    kafkaJSConsumer.on(kafkaJSConsumer.events.GROUP_JOIN, (event: any) => {
      ready = true;
    });
    await kafkaJSConsumer.connect();
    await kafkaJSConsumer.subscribe({topic: KAFKA_JS_TOPIC, fromBeginning: true});
    await kafkaJSConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  it("reports lag with Confluent", async () => {
    for (let i = 0; i < 10; i++) {
      await confluentProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    let ready = false;
    let receivedMessages: number = 0;
    confluentConsumer = confluentKafka.consumer({
      kafkaJS: {groupId: CONFLUENT_GROUP_ID, fromBeginning: true},
      rebalance_cb: (err: any, assignment: any, consumer: any) => {
        if (err.code !== RdKafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) return;
        if (!ready) {
          ready = true;
        }
      }
    });
    await confluentConsumer.connect();
    await confluentConsumer.subscribe({topic: CONFLUENT_TOPIC});
    await confluentConsumer.run({
      eachBatch: async ({batch}) => {
        console.log(`Received batch with ${batch.messages.length} messages with highWatermark ${batch.highWatermark} on partition ${batch.partition}`);
        const highWatermark = parseInt(batch.highWatermark);
        for (const message of batch.messages) {
          const offset = parseInt(message.offset);
          console.log(`  Processing offset ${message.offset} which has a lag of ${highWatermark - offset}`);
          receivedMessages++;
        }
      }
    });

    await until(() => ready);

    for (let i = 0; i < 10; i++) {
      await kafkaJSProducer.send({
        topic: CONFLUENT_TOPIC,
        messages: [{value: "one"}]
      });
    }

    await until(() => receivedMessages == 20);
  });

  async function until(condition: () => boolean) {
    const timeout = 10000;
    const finish = Date.now() + timeout;
    while (Date.now() <= finish) {
      const result = condition();
      if (result) return;
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    throw new Error(`Failed within ${timeout!}ms`);
  }
});

KafkaJS output is:

Received batch with 13 messages with highWatermark 13 on partition 0
  Processing offset 0 which has a lag of 13
  Processing offset 1 which has a lag of 12
  Processing offset 2 which has a lag of 11
  Processing offset 3 which has a lag of 10
  Processing offset 4 which has a lag of 9
  Processing offset 5 which has a lag of 8
  Processing offset 6 which has a lag of 7
  Processing offset 7 which has a lag of 6
  Processing offset 8 which has a lag of 5
  Processing offset 9 which has a lag of 4
  Processing offset 10 which has a lag of 3
  Processing offset 11 which has a lag of 2
  Processing offset 12 which has a lag of 1
Received batch with 4 messages with highWatermark 17 on partition 0
  Processing offset 13 which has a lag of 4
  Processing offset 14 which has a lag of 3
  Processing offset 15 which has a lag of 2
  Processing offset 16 which has a lag of 1
Received batch with 3 messages with highWatermark 20 on partition 0
  Processing offset 17 which has a lag of 3
  Processing offset 18 which has a lag of 2
  Processing offset 19 which has a lag of 1

Confluent output is:

Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 20 which has a lag of -1021
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 21 which has a lag of -1022
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 22 which has a lag of -1023
  Processing offset 23 which has a lag of -1024
Received batch with 2 messages with highWatermark -1001 on partition 0
  Processing offset 24 which has a lag of -1025
  Processing offset 25 which has a lag of -1026
Received batch with 4 messages with highWatermark -1001 on partition 0
  Processing offset 26 which has a lag of -1027
  Processing offset 27 which has a lag of -1028
  Processing offset 28 which has a lag of -1029
  Processing offset 29 which has a lag of -1030
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 30 which has a lag of -1031
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 31 which has a lag of -1032
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 32 which has a lag of -1033
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 33 which has a lag of -1034
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 34 which has a lag of -1035
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 35 which has a lag of -1036
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 36 which has a lag of -1037
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 37 which has a lag of -1038
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 38 which has a lag of -1039
Received batch with 1 messages with highWatermark -1001 on partition 0
  Processing offset 39 which has a lag of -1040
@milindl milindl added the enhancement New feature or request label Apr 7, 2025
@milindl milindl linked a pull request May 16, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants