Skip to content

Flush never occurs in high frequency events when starts_when and ends_when are not defined #16145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tomers opened this issue Jan 26, 2023 · 3 comments · Fixed by #16146
Closed
Labels
type: bug A code related bug.

Comments

@tomers
Copy link
Contributor

tomers commented Jan 26, 2023

A note for the community

  • Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
  • If you are interested in working on this issue or have submitted a pull request, please leave a comment

Problem

I current reduce transform, in case both starts_when and ends_when are not defined, aggregation continues on and on, without flushing as long as events arrive in rate that is faster than the rate of expire_ms between events.

For example, if we want to group by user, but keep getting events in rate faster than 1/minute, then flush will never occur.

Configuration

[transforms.group_by_user]
  type = "reduce"
  inputs = ["user_log_events"]
  group_by = ["org_id", "user_id"]
  expire_after_ms = 60000

Version

0.27.0

Reproduction:

Create a configuration file /tmp/vector.toml:

[sources.userpackets]
type = "stdin"
max_length = 1_024

[transforms.jsonize]
type = "remap"
inputs = ["userpackets"]
source = """
  . |= object!(parse_json!(.message))
  del(.message)
"""

[transforms.rrr]
type = "reduce"
inputs = ["jsonize"]
group_by = ["user"]
expire_after_ms = 10_000

[sinks.print]
inputs = ["rrr"]
type = "console"
encoding.codec = "json"

Create log generation script, gen_log.py:

#!/usr/bin/env python3

import random
import json
import time


def main():
    while True:
        u = random.randint(2, 5)
        d = {
            "user": f"usr-{u}",
            "pkts": random.randint(0, 9) + 100 ** (u-1)
        }
        print(json.dumps(d), flush=True)
        time.sleep(0.4)


main()

Run the Vector docker:

chmod +x gen_log.py
./gen_log.py | docker run --rm -i -v /tmp/vector.toml:/etc/vector/vector.toml timberio/vector:latest-alpine

Expected behavior

The generated log lines should appear in a reduced manner ever 10 seconds.

Observed behavior

The generated log lines never appear.

@tomers tomers added the type: bug A code related bug. label Jan 26, 2023
tomers pushed a commit to tomers/vector that referenced this issue Jan 26, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
@davidhuie-dd
Copy link
Contributor

davidhuie-dd commented Jan 27, 2023

This sounds like the expected behavior of expire_after_ms? If you want periodic flushes, there's the setting flush_period_ms: https://vector.dev/docs/reference/configuration/transforms/reduce/#flush_period_ms

tomers pushed a commit to tomers/vector that referenced this issue Jan 29, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
@tomers
Copy link
Contributor Author

tomers commented Jan 29, 2023

This sounds like the expected behavior of expire_after_ms? If you want periodic flushes, there's the setting flush_periodms`: https://vector.dev/docs/reference/configuration/transforms/reduce/#flush_period_ms

@davidhuie-dd, please read the issue description I just edited. It contains reproduction information. The flush_period_ms does not help in this scenario.

@davidhuie-dd
Copy link
Contributor

Yep, I agree with your assessment, and I reproduced the issue (thanks!).

tomers pushed a commit to tomers/vector that referenced this issue Jan 31, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
tomers pushed a commit to tomers/vector that referenced this issue Feb 2, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
tomers pushed a commit to tomers/vector that referenced this issue Feb 4, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
tomers pushed a commit to tomers/vector that referenced this issue Feb 12, 2023
Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes vectordotdev#16145.
davidhuie-dd pushed a commit that referenced this issue Feb 23, 2023
…igh rate (#16146)

* transforms/reduce: fix flush not occurring

Fixes flush not occuring when events arrive in high frequency (in rate
faster then `expire_after` between events).
Fixes #16145.

* transforms/reduce: add unit-tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug A code related bug.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants