Closed
Description
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
When running a kafka source with acknowledgements enabled on the sink, I've noticed some messages being processed twice when vector is restarted. On shutdown, the acknowledgement stream isn't being drained before the process exits, so any messages that were consumed and acknowledged, but whose ack wasn't processed, get duplicated when vector starts up again.
Running the config below, it's easy to recreate this:
- Start vector
- Produce a large number of messages
- Shortly after producing messages, terminate vector
- Restart vector
Comparing the blackhole
sink logs shows more messages processed than were produced.
# Produce 10,000 messages then kill vector
❯ kcat -b $broker -P -t testTopic1 -K : -l <(for i in $(seq 1 10000); do echo "${i}:{\"value\": ${i}}"; done); sleep 0.1; pkill vector
Logs:
2022-10-07T04:34:08.450213Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:09.451049Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:10.450242Z INFO vector::sinks::blackhole::sink: Total events collected events=4329 raw_bytes_collected=3604950
2022-10-07T04:34:10.504935Z INFO vector: Vector has stopped.
2022-10-07T04:34:10.505154Z INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="devnull, testTopic1" time_remaining="59 seconds left"
2022-10-07T04:34:10.544353Z INFO vector::sinks::blackhole::sink: Total events collected events=9031 raw_bytes_collected=7521716
❯ KAFKA_ADDRS=10.2.22.139:9092 VECTOR_API_PORT=8687 vector -c kafka2blackhole.toml
2022-10-07T04:34:51.798202Z INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,kube=info"
2022-10-07T04:34:51.798509Z INFO vector::app: Loading configs. paths=["kafka2blackhole.toml"]
2022-10-07T04:34:51.806936Z INFO vector::topology::running: Running healthchecks.
2022-10-07T04:34:51.807383Z INFO vector: Vector has started. debug="false" version="0.24.1" arch="x86_64" build_id="8935681 2022-09-12"
2022-10-07T04:34:51.807497Z INFO vector::internal_events::api: API server running. address=0.0.0.0:8687 playground=off
2022-10-07T04:34:51.807707Z INFO vector::topology::builder: Healthcheck: Passed.
2022-10-07T04:34:51.809614Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:52.809293Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:53.809251Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:54.809671Z INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:55.809451Z INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832
2022-10-07T04:34:56.809269Z INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832
Out of 10,000 messages produced, 9,031 + 1007 = 10,038
were processed.
Configuration
[sources.testTopic1]
type = "kafka"
bootstrap_servers = "${KAFKA_ADDRS}"
group_id = "group1"
topics = [ "testTopic1" ]
decoding = { codec = "json" }
[sinks.devnull]
type = "blackhole"
inputs = [ "*" ]
acknowledgements = { enabled = true }
Version
vector 0.24.1
Debug Output
No response
Example Data
No response
Additional Context
No response
References
Maybe related to #11405