-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Enhance message gap metric to include min/max/avg aggregations #17847
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Enhance message gap metric to include min/max/avg aggregations #17847
Conversation
9964029
to
8722e82
Compare
benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StreamAppenderatorBenchmark.java
Fixed
Show fixed
Hide fixed
bc90805
to
4e2633a
Compare
@Setup | ||
public void setup() throws IOException | ||
{ | ||
tempDir = File.createTempFile("druid-appenderator-benchmark", "tmp"); |
Check warning
Code scanning / CodeQL
Local information disclosure in a temporary directory Medium test
584a2a6
to
6c80ada
Compare
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
Outdated
Show resolved
Hide resolved
6c80ada
to
dcd0623
Compare
server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
Fixed
Show fixed
Hide fixed
server/src/test/java/org/apache/druid/segment/realtime/SegmentGenerationMetricsTest.java
Fixed
Show fixed
Hide fixed
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
Outdated
Show resolved
Hide resolved
dcd0623
to
f88b828
Compare
f85c609
to
ae85511
Compare
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() + | ||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() + | ||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + | ||
", maxPendingPersists=" + getMaxPendingPersists() + |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation Note
SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists
a06f92c
to
d4c8adc
Compare
d4c8adc
to
b20ab94
Compare
public void reportMessageGap(final long messageGap) | ||
{ | ||
final long numEvent = this.numMessageGap.incrementAndGet(); | ||
this.avgMessageGap.getAndUpdate(oldAvg -> oldAvg + ((messageGap - oldAvg) / numEvent)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't it be more efficient to just keep a running sum? Then only calculate the avg using numEvent when we get the average
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't it be more efficient to just keep a running sum? Then only calculate the avg using numEvent when we get the average
In short, given the emission period is variable, we could overflow on the sum. I figured the simplest solution was taking the performance penalty of floating pt division. Other offline alternatives seemed a bit more complicated like sampling/caching the gap values in a ring buffer and moving the avg calculation to emission-time, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with @maytasm on this.
Just keep a AtomicDouble totalMessageGap
(to avoid the overflow problem)
and the total number of events.
Do not compute the average until it needs to be reported.
@maytasm , @jtuglu-netflix , thanks for the patience on this. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes, @jtuglu-netflix .
I feel there are several changes that are probably not needed here.
Some suggestions:
- Move the benchmarks to a separate PR. Focus this PR only on the new metrics.
- Do not add a new config to emit the metrics
- Do not emit min message gap as it doesn't add any new value
- Simplify computation of message gap
Please see the review comments for more details.
|
||
@Warmup(iterations = 15) | ||
@Measurement(iterations = 15) | ||
public class StreamAppenderatorBenchmark extends AppenderatorBenchmark |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jtuglu-netflix , would you mind moving the benchmarks to a separate PR and keeping this PR solely for the new metrics?
I am assuming that the benchmarks don't have anything to do with these metrics in particular. Please correct me if I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new metrics added overhead to the loop. Adding the perf improvements + benchmarks remove this overhead, so I figured I'd keep them in the same PR. I'll remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we can keep the SegmentGenerationMetricsBenchmark
in this PR since it is related.
If we create a separate PR for the other benchmarks, we can review and merge that PR first.
Then we will be able to better evaluate the changes in this PR against the already added benchmarks.
It will also keep the reviews and the commit history more straightforward.
Let me know if that sounds good.
@@ -819,13 +819,13 @@ private TaskStatus generateAndPublishSegments( | |||
final PartitionAnalysis partitionAnalysis | |||
) throws IOException, InterruptedException | |||
{ | |||
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics(); | |||
final IndexTuningConfig tuningConfig = ingestionSchema.getTuningConfig(); | |||
final SegmentGenerationMetrics buildSegmentsSegmentGenerationMetrics = new SegmentGenerationMetrics(tuningConfig.getMessageGapAggStatsEnabled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The message gap will only ever be emitted for streaming tasks. It should not be a part of IndexTuningConfig
and we should always pass false to the SegmentGenerationMetrics
here.
@@ -1291,7 +1293,8 @@ public IndexTuningConfig( | |||
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, | |||
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, | |||
@JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, | |||
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads | |||
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, | |||
@JsonProperty("messageGapAggStatsEnabled") @Nullable Boolean messageGapAggStatsEnabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IndexTask
and IndexTuningConfig
should not have this field as it is relevant only for streaming tasks.
// cache volatile locally so it's likely to be a register read later | ||
final long systemTime = currTimeMs; | ||
if (messageGapAggStats) { | ||
metrics.reportMessageGap(systemTime - row.getTimestampFromEpoch()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic of computing the actual message gap should live inside SegmentGenerationMetrics
itself. The code here should just pass the row timestamp.
timeExecutor.scheduleAtFixedRate( | ||
() -> currTimeMs = System.currentTimeMillis(), | ||
0, | ||
1, | ||
TimeUnit.MILLISECONDS | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem like the right approach. We shouldn't need a separate executor just to update the currTimeMs
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was the simplest way to avoid the performance overhead of calling the time function in the main loop. On Linux, this adds a significant performance penalty (24ns/row/call). I indicated this in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other cleaner ways to avoid invoking the System.currentTimeMillis()
call.
They could be somewhat less accurate but good enough for the use case here.
Just initialize the start time once when the message gap is being reset in SegmentGenerationMetrics
.
Maintain a Stopwatch
.
When calling recordMessageLag
, compute message gap as follows:
messageGap = startTimestamp + stopwatch.millisElapsed() - rowTimestamp
Let me know if that would work.
// Best-effort way to ensure parity amongst emitted metrics | ||
if (metrics.isMessageGapAggStatsEnabled()) { | ||
if (minMessageGap != Long.MAX_VALUE) { | ||
emitter.emit(builder.setMetric("ingest/events/minMessageGap", minMessageGap)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the min message gap really add any value?
I don't imagine any SLAs relying on the min message gap.
I think just average and max should be enough.
And since these are only two new metrics reported in every metric emission period, I think we should not need to add a new config. It is fine to emit new metrics as long as we are not changing values of any of the existing metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And since these are only two new metrics reported in every metric emission period, I think we should not need to add a new config.
These new metrics add overhead to the loop, not sure if that's something people want to "opt-in" to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use the impl suggested in this comment, the overhead would be negligible I feel.
public void reportMessageGap(final long messageGap) | ||
{ | ||
final long numEvent = this.numMessageGap.incrementAndGet(); | ||
this.avgMessageGap.getAndUpdate(oldAvg -> oldAvg + ((messageGap - oldAvg) / numEvent)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would agree with @maytasm on this.
Just keep a AtomicDouble totalMessageGap
(to avoid the overflow problem)
and the total number of events.
Do not compute the average until it needs to be reported.
Description
Realtime ingest message gap metric additions
ingest/events/messageGap
remains as-is.ingest/events/minMessageGap
, the minimum message gap seen in the currently-running task within the emission period.ingest/events/maxMessageGap
, the maximum message gap seen in the currently-running task within the emission period.ingest/events/avgMessageGap
, the avg message gap seen in the currently-running task across the entire duration thus far.Appenderator Benchmarks
Appenderator::add()
method.SegmentGenerationMetrics Benchmarks
Small optimizations to
StreamAppenderator::add()
System.currentTimeMillis()
by caching locally in a volatile updated by a background thread.persistAll()
(note: this has been omitted since it seems to be slower on smaller sink #s).Benchmarks
Appenderator Benchmarks
NB: All benchmarks run with 15 warm up iterations of 10k rows each, followed by 15 iterations of 10k rows.
Performance Before
Performance After
Metrics Benchmarks
Notes
Release note
Add min/max/avg message gap reporting metrics to realtime indexing jobs.
Key changed/added classes in this PR
benchmarks/src/test/java/org/apache/druid/benchmark/indexing/AppenderatorBenchmark.java
benchmarks/src/test/java/org/apache/druid/benchmark/indexing/StreamAppenderatorBenchmark.java
benchmarks/src/test/java/org/apache/druid/benchmark/indexing/metrics/SegmentGenerationMetricsBenchmark.java
indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
indexing-service/src/test/java/org/apache/druid/indexing/common/TaskRealtimeMetricsMonitorTest.java
server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java
server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java
This PR has: