Skip to content

Kafka source not processing pending acknowledgements at shutdown #14761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
jches opened this issue Oct 7, 2022 · 1 comment · Fixed by #17497
Closed

Kafka source not processing pending acknowledgements at shutdown #14761

jches opened this issue Oct 7, 2022 · 1 comment · Fixed by #17497
Labels
domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements source: kafka Anything `kafka` source related type: bug A code related bug.

Comments

@jches
Copy link
Contributor

jches commented Oct 7, 2022

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

When running a kafka source with acknowledgements enabled on the sink, I've noticed some messages being processed twice when vector is restarted. On shutdown, the acknowledgement stream isn't being drained before the process exits, so any messages that were consumed and acknowledged, but whose ack wasn't processed, get duplicated when vector starts up again.

Running the config below, it's easy to recreate this:

  1. Start vector
  2. Produce a large number of messages
  3. Shortly after producing messages, terminate vector
  4. Restart vector

Comparing the blackhole sink logs shows more messages processed than were produced.

# Produce 10,000 messages then kill vector
❯ kcat -b $broker -P -t testTopic1 -K : -l <(for i in $(seq 1 10000); do echo "${i}:{\"value\": ${i}}"; done);  sleep 0.1; pkill vector

Logs:

2022-10-07T04:34:08.450213Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:09.451049Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:10.450242Z  INFO vector::sinks::blackhole::sink: Total events collected events=4329 raw_bytes_collected=3604950
2022-10-07T04:34:10.504935Z  INFO vector: Vector has stopped.
2022-10-07T04:34:10.505154Z  INFO vector::topology::running: Shutting down... Waiting on running components. remaining_components="devnull, testTopic1" time_remaining="59 seconds left"
2022-10-07T04:34:10.544353Z  INFO vector::sinks::blackhole::sink: Total events collected events=9031 raw_bytes_collected=7521716


❯ KAFKA_ADDRS=10.2.22.139:9092 VECTOR_API_PORT=8687 vector -c kafka2blackhole.toml
2022-10-07T04:34:51.798202Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=trace,rdkafka=info,buffers=info,kube=info"
2022-10-07T04:34:51.798509Z  INFO vector::app: Loading configs. paths=["kafka2blackhole.toml"]
2022-10-07T04:34:51.806936Z  INFO vector::topology::running: Running healthchecks.
2022-10-07T04:34:51.807383Z  INFO vector: Vector has started. debug="false" version="0.24.1" arch="x86_64" build_id="8935681 2022-09-12"
2022-10-07T04:34:51.807497Z  INFO vector::internal_events::api: API server running. address=0.0.0.0:8687 playground=off
2022-10-07T04:34:51.807707Z  INFO vector::topology::builder: Healthcheck: Passed.
2022-10-07T04:34:51.809614Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:52.809293Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:53.809251Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:54.809671Z  INFO vector::sinks::blackhole::sink: Total events collected events=0 raw_bytes_collected=0
2022-10-07T04:34:55.809451Z  INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832
2022-10-07T04:34:56.809269Z  INFO vector::sinks::blackhole::sink: Total events collected events=1007 raw_bytes_collected=838832

Out of 10,000 messages produced, 9,031 + 1007 = 10,038 were processed.

Configuration

[sources.testTopic1]
type = "kafka"
bootstrap_servers = "${KAFKA_ADDRS}"
group_id = "group1"
topics = [ "testTopic1" ]
decoding = { codec = "json" }

[sinks.devnull]
type = "blackhole"
inputs      = [ "*" ]
acknowledgements = { enabled = true }

Version

vector 0.24.1

Debug Output

No response

Example Data

No response

Additional Context

No response

References

Maybe related to #11405

@jches jches added the type: bug A code related bug. label Oct 7, 2022
@spencergilbert spencergilbert added source: kafka Anything `kafka` source related domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements labels Oct 7, 2022
@jches
Copy link
Contributor Author

jches commented Oct 9, 2022

There appear to be two pieces to this, and I haven't looked at how other sources handle this but they may be affected by similar issues as well:

  1. The kafka source main loop listens for a shutdown signal, and exits immediately. Its ack_stream may have pending items, which ideally should be drained before shutdown

    vector/src/sources/kafka.rs

    Lines 245 to 248 in 68aae83

    tokio::select! {
    _ = &mut shutdown => break,
    entry = ack_stream.next() => if let Some((status, entry)) = entry {
    if status == BatchStatus::Delivered {

  2. The finalizer stream main loop listens on the same shutdown channel, and can also leave pending items in its new_entries stream. As I understand it, any source with a finalizer stream will add to this stream after sending an event through its output, so events corresponding to the entries pending in this stream that get dropped may get processed but won't have their acknowledgement processed.

    biased;
    _ = &mut shutdown => break,
    // Only poll for new entries until shutdown is flagged.
    new_entry = new_entries.recv() => match new_entry {
    Some((receiver, entry)) => {

One way to fix both of these is to remove the shutdown signal handling from the finalizer stream, and have the source drop the sender end of that stream when it receives the shutdown signal. Dropping the sender will signal shutdown to the finalizer task by ending the new_entries stream. Then, both pending entries and acks are able to drain and things exit cleanly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
domain: delivery Anything related to delivering events within Vector such as end-to-end acknowledgements source: kafka Anything `kafka` source related type: bug A code related bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants