Skip to content

Kafka source not processing pending acknowledgements at shutdown #14761

Closed
@jches

Description

@jches

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

When running a kafka source with acknowledgements enabled on the sink, I've noticed some messages being processed twice when vector is restarted. On shutdown, the acknowledgement stream isn't being drained before the process exits, so any messages that were consumed and acknowledged, but whose ack wasn't processed, get duplicated when vector starts up again.

Running the config below, it's easy to recreate this:

  1. Start vector
  2. Produce a large number of messages
  3. Shortly after producing messages, terminate vector
  4. Restart vector

Comparing the blackhole sink logs shows more messages processed than were produced.

# Produce 10,000 messages then kill vector
❯ kcat -b $broker -P -t testTopic1 -K : -l <(for i in $(seq 1 10000); do echo "${i}:{\"value\": ${i}}"; done);  sleep 0.1; pkill vector

Logs:

2022-10-07T04:34:08.450213Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:09.451049Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:10.450242Z  INFO vector::sinks::blackhole::sink: Total events collected events=4329 raw_bytes_collected=3604950
2022-10-07T04:34:10.504935Z  INFO vector: Vector has stopped.
2022-10-07T04:34:10.505154Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="devnull, testTopic1" time_remaining="59 seconds left"
2022-10-07T04:34:10.544353Z  INFO vector::sinks::blackhole::sink: Total events collected events=9031 raw_bytes_collected=7521716


❯ KAFKA_ADDRS=10.2.22.139:9092 VECTOR_API_PORT=8687 vector -c kafka2blackhole.toml
2022-10-07T04:34:51.798202Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,kube=info"
2022-10-07T04:34:51.798509Z  INFO vector::app: Loading configs. paths=["kafka2blackhole.toml"]
2022-10-07T04:34:51.806936Z  INFO vector::topology::running: Running healthchecks.
2022-10-07T04:34:51.807383Z  INFO vector: Vector has started. debug="false" version="0.24.1" arch="x86_64" build_id="8935681 2022-09-12"
2022-10-07T04:34:51.807497Z  INFO vector::internal_events::api: API server running. address=0.0.0.0:8687 playground=off
2022-10-07T04:34:51.807707Z  INFO vector::topology::builder: Healthcheck: Passed.
2022-10-07T04:34:51.809614Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:52.809293Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:53.809251Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:54.809671Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:55.809451Z  INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832
2022-10-07T04:34:56.809269Z  INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832

Out of 10,000 messages produced, 9,031 + 1007 = 10,038 were processed.

Configuration

[sources.testTopic1]
type = "kafka"
bootstrap_servers = "${KAFKA_ADDRS}"
group_id = "group1"
topics = [ "testTopic1" ]
decoding = { codec = "json" }

[sinks.devnull]
type = "blackhole"
inputs      = [ "*" ]
acknowledgements = { enabled = true }

Version

vector 0.24.1

Debug Output

No response

Example Data

No response

Additional Context

No response

References

Maybe related to #11405

Metadata

Metadata

Assignees

No one assigned

    Labels

    domain: deliveryAnything related to delivering events within Vector such as end-to-end acknowledgementssource: kafkaAnything `kafka` source relatedtype: bugA code related bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions