Skip to content

Enhance message gap metric to include min/max/avg aggregations #17847

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jtuglu-netflix
Copy link
Contributor

@jtuglu-netflix jtuglu-netflix commented Mar 30, 2025

Description

Realtime ingest message gap metric additions

  • The current definition of ingest/events/messageGap remains as-is.
  • Adds ingest/events/minMessageGap, the minimum message gap seen in the currently-running task within the emission period.
  • Adds ingest/events/maxMessageGap, the maximum message gap seen in the currently-running task within the emission period.
  • Adds ingest/events/avgMessageGap, the avg message gap seen in the currently-running task across the entire duration thus far.

Appenderator Benchmarks

  • Adds Stream Appenderator benchmarks for the per-row Appenderator::add() method.

SegmentGenerationMetrics Benchmarks

  • Adds SegmentGenerationMetrics benchmarks for the expensive reporting methods.

Small optimizations to StreamAppenderator::add()

  • Avoid repeated System.currentTimeMillis() by caching locally in a volatile updated by a background thread.
  • Switch to fixed-length array for persist reason list (~20-40ns per row savings).
  • Switch to fixed-size hydrant map allocation in persistAll() (note: this has been omitted since it seems to be slower on smaller sink #s).
  • The above result in what appears to be a 100-300us speedup per 10k iterations

Benchmarks

  • OS: Linux
  • Arch: x86
  • Memory: 240GB ram
  • CPU 32 physical core, 2.6 Mhz base frequency

Appenderator Benchmarks

NB: All benchmarks run with 15 warm up iterations of 10k rows each, followed by 15 iterations of 10k rows.

Performance Before
Benchmark                                    (NUM_ROWS)  Mode  Cnt     Score     Error  Units
StreamAppenderatorBenchmark.benchmarkAddRow       10000   avgt   15  5760.485 ± 50.048  us/op
Performance After
Benchmark                                    (NUM_ROWS)  (enableMessageGapAggStats)  Mode  Cnt     Score    Error  Units
StreamAppenderatorBenchmark.benchmarkAddRow       10000                       false  avgt   15  5461.427 ± 40.418  us/op
StreamAppenderatorBenchmark.benchmarkAddRow       10000                        true  avgt   15  5525.679 ± 32.416  us/op

Metrics Benchmarks

Benchmark                                                                       (enableMessageGapMetrics)  Mode  Cnt  Score   Error  Units
SegmentGenerationMetricsBenchmark.benchmarkMultipleReportMaxSegmentHandoffTime                       true  avgt    2  1.696          ns/op
SegmentGenerationMetricsBenchmark.benchmarkMultipleReportMaxSegmentHandoffTime                      false  avgt    2  1.686          ns/op
SegmentGenerationMetricsBenchmark.benchmarkMultipleReportMessageGap                                  true  avgt    2  8.829          ns/op
SegmentGenerationMetricsBenchmark.benchmarkMultipleReportMessageGap                                 false  avgt    2  8.822          ns/op

Notes

  • Kept the main() functions inside the benchmarks temporarily so they're easy for folks to run from IDE outright.
  • Will add docs for these new metrics once approved

Release note

Add min/max/avg message gap reporting metrics to realtime indexing jobs.


Key changed/added classes in this PR
  • benchmarks/src/test/java/org/apache/druid/benchmark/indexing/AppenderatorBenchmark.java
  • benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StreamAppenderatorBenchmark.java
  • benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
  • indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
  • indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
  • indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
  • indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
  • server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
  • server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
  • server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch 5 times, most recently from 9964029 to 8722e82 Compare March 31, 2025 06:06
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch 4 times, most recently from bc90805 to 4e2633a Compare March 31, 2025 06:55
@Setup
public void setup() throws IOException
{
tempDir = File.createTempFile("druid-appenderator-benchmark", "tmp");

Check warning

Code scanning / CodeQL

Local information disclosure in a temporary directory Medium test

Local information disclosure vulnerability due to use of file readable by other local users.
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch 12 times, most recently from 584a2a6 to 6c80ada Compare April 1, 2025 16:28
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch from 6c80ada to dcd0623 Compare April 3, 2025 20:51
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch from dcd0623 to f88b828 Compare April 3, 2025 21:42
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch 4 times, most recently from f85c609 to ae85511 Compare April 14, 2025 23:09
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", maxPendingPersists=" + getMaxPendingPersists() +

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists
should be avoided because it has been deprecated.
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch 3 times, most recently from a06f92c to d4c8adc Compare April 15, 2025 06:35
@jtuglu-netflix jtuglu-netflix marked this pull request as ready for review April 15, 2025 06:36
@jtuglu-netflix jtuglu-netflix requested a review from bsyk April 15, 2025 18:54
@jtuglu-netflix jtuglu-netflix force-pushed the upgrade-message-gap-metric branch from d4c8adc to b20ab94 Compare April 16, 2025 05:50
public void reportMessageGap(final long messageGap)
{
final long numEvent = this.numMessageGap.incrementAndGet();
this.avgMessageGap.getAndUpdate(oldAvg -> oldAvg + ((messageGap - oldAvg) / numEvent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it be more efficient to just keep a running sum? Then only calculate the avg using numEvent when we get the average

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't it be more efficient to just keep a running sum? Then only calculate the avg using numEvent when we get the average

In short, given the emission period is variable, we could overflow on the sum. I figured the simplest solution was taking the performance penalty of floating pt division. Other offline alternatives seemed a bit more complicated like sampling/caching the gap values in a ring buffer and moving the avg calculation to emission-time, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with @maytasm on this.
Just keep a AtomicDouble totalMessageGap (to avoid the overflow problem)
and the total number of events.
Do not compute the average until it needs to be reported.

@maytasm maytasm added Design Review Area - Metrics/Event Emitting and removed Area - Batch Ingestion Kubernetes Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Apr 20, 2025
@maytasm maytasm requested review from kfaraz and samarthjain and removed request for bsyk April 20, 2025 01:05
@github-actions github-actions bot added Area - Batch Ingestion Kubernetes Area - Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Apr 24, 2025
@kfaraz
Copy link
Contributor

kfaraz commented Apr 25, 2025

@maytasm , @jtuglu-netflix , thanks for the patience on this.
I will review the PR later today.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @jtuglu-netflix .

I feel there are several changes that are probably not needed here.

Some suggestions:

  • Move the benchmarks to a separate PR. Focus this PR only on the new metrics.
  • Do not add a new config to emit the metrics
  • Do not emit min message gap as it doesn't add any new value
  • Simplify computation of message gap

Please see the review comments for more details.


@Warmup(iterations = 15)
@Measurement(iterations = 15)
public class StreamAppenderatorBenchmark extends AppenderatorBenchmark
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtuglu-netflix , would you mind moving the benchmarks to a separate PR and keeping this PR solely for the new metrics?

I am assuming that the benchmarks don't have anything to do with these metrics in particular. Please correct me if I am missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new metrics added overhead to the loop. Adding the perf improvements + benchmarks remove this overhead, so I figured I'd keep them in the same PR. I'll remove.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can keep the SegmentGenerationMetricsBenchmark in this PR since it is related.

If we create a separate PR for the other benchmarks, we can review and merge that PR first.
Then we will be able to better evaluate the changes in this PR against the already added benchmarks.

It will also keep the reviews and the commit history more straightforward.

Let me know if that sounds good.

@@ -819,13 +819,13 @@ private TaskStatus generateAndPublishSegments(
final PartitionAnalysis partitionAnalysis
) throws IOException, InterruptedException
{
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics();
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig();
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics(tuningConfig.getMessageGapAggStatsEnabled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message gap will only ever be emitted for streaming tasks. It should not be a part of IndexTuningConfig and we should always pass false to the SegmentGenerationMetrics here.

@@ -1291,7 +1293,8 @@ public IndexTuningConfig(
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("messageGapAggStatsEnabled") @Nullable Boolean messageGapAggStatsEnabled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IndexTask and IndexTuningConfig should not have this field as it is relevant only for streaming tasks.

// cache volatile locally so it's likely to be a register read later
final long systemTime = currTimeMs;
if (messageGapAggStats) {
metrics.reportMessageGap(systemTime - row.getTimestampFromEpoch());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic of computing the actual message gap should live inside SegmentGenerationMetrics itself. The code here should just pass the row timestamp.

Comment on lines +300 to +305
timeExecutor.scheduleAtFixedRate(
() -> currTimeMs = System.currentTimeMillis(),
0,
1,
TimeUnit.MILLISECONDS
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem like the right approach. We shouldn't need a separate executor just to update the currTimeMs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the simplest way to avoid the performance overhead of calling the time function in the main loop. On Linux, this adds a significant performance penalty (24ns/row/call). I indicated this in the PR description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other cleaner ways to avoid invoking the System.currentTimeMillis() call.
They could be somewhat less accurate but good enough for the use case here.

Just initialize the start time once when the message gap is being reset in SegmentGenerationMetrics.
Maintain a Stopwatch.

When calling recordMessageLag, compute message gap as follows:

messageGap = startTimestamp + stopwatch.millisElapsed() - rowTimestamp

Let me know if that would work.

// Best-effort way to ensure parity amongst emitted metrics
if (metrics.isMessageGapAggStatsEnabled()) {
if (minMessageGap != Long.MAX_VALUE) {
emitter.emit(builder.setMetric("ingest/events/minMessageGap", minMessageGap));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the min message gap really add any value?
I don't imagine any SLAs relying on the min message gap.
I think just average and max should be enough.

And since these are only two new metrics reported in every metric emission period, I think we should not need to add a new config. It is fine to emit new metrics as long as we are not changing values of any of the existing metrics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And since these are only two new metrics reported in every metric emission period, I think we should not need to add a new config.

These new metrics add overhead to the loop, not sure if that's something people want to "opt-in" to.

Copy link
Contributor

@kfaraz kfaraz Apr 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the impl suggested in this comment, the overhead would be negligible I feel.

public void reportMessageGap(final long messageGap)
{
final long numEvent = this.numMessageGap.incrementAndGet();
this.avgMessageGap.getAndUpdate(oldAvg -> oldAvg + ((messageGap - oldAvg) / numEvent));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would agree with @maytasm on this.
Just keep a AtomicDouble totalMessageGap (to avoid the overflow problem)
and the total number of events.
Do not compute the average until it needs to be reported.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants