Skip to content

Commit 584a2a6

Browse files
Enhance message gap metric to include min/max/avg aggregations
1 parent aac1ff5 commit 584a2a6

File tree

7 files changed

+230
-44
lines changed

7 files changed

+230
-44
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java

+54-13
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,21 @@ public boolean doMonitor(ServiceEmitter emitter)
9090
}
9191
emitter.emit(builder.setMetric("ingest/events/unparseable", unparseable));
9292

93-
final long processedWithError = rowIngestionMetersTotals.getProcessedWithError() - previousRowIngestionMetersTotals.getProcessedWithError();
93+
final long processedWithError = rowIngestionMetersTotals.getProcessedWithError()
94+
- previousRowIngestionMetersTotals.getProcessedWithError();
9495
if (processedWithError > 0) {
95-
log.error("[%,d] events processed with errors! Set logParseExceptions to true in the ingestion spec to log these errors.", processedWithError);
96+
log.error(
97+
"[%,d] events processed with errors! Set logParseExceptions to true in the ingestion spec to log these errors.",
98+
processedWithError
99+
);
96100
}
97101
emitter.emit(builder.setMetric("ingest/events/processedWithError", processedWithError));
98102

99-
emitter.emit(builder.setMetric("ingest/events/processed", rowIngestionMetersTotals.getProcessed() - previousRowIngestionMetersTotals.getProcessed()));
103+
emitter.emit(builder.setMetric(
104+
"ingest/events/processed",
105+
rowIngestionMetersTotals.getProcessed()
106+
- previousRowIngestionMetersTotals.getProcessed()
107+
));
100108

101109
final long dedup = metrics.dedup() - previousSegmentGenerationMetrics.dedup();
102110
if (dedup > 0) {
@@ -110,27 +118,60 @@ public boolean doMonitor(ServiceEmitter emitter)
110118
)
111119
);
112120

113-
emitter.emit(builder.setMetric("ingest/rows/output", metrics.rowOutput() - previousSegmentGenerationMetrics.rowOutput()));
114-
emitter.emit(builder.setMetric("ingest/persists/count", metrics.numPersists() - previousSegmentGenerationMetrics.numPersists()));
115-
emitter.emit(builder.setMetric("ingest/persists/time", metrics.persistTimeMillis() - previousSegmentGenerationMetrics.persistTimeMillis()));
116-
emitter.emit(builder.setMetric("ingest/persists/cpu", metrics.persistCpuTime() - previousSegmentGenerationMetrics.persistCpuTime()));
121+
emitter.emit(builder.setMetric(
122+
"ingest/rows/output",
123+
metrics.rowOutput() - previousSegmentGenerationMetrics.rowOutput()
124+
));
125+
emitter.emit(builder.setMetric(
126+
"ingest/persists/count",
127+
metrics.numPersists() - previousSegmentGenerationMetrics.numPersists()
128+
));
129+
emitter.emit(builder.setMetric(
130+
"ingest/persists/time",
131+
metrics.persistTimeMillis() - previousSegmentGenerationMetrics.persistTimeMillis()
132+
));
133+
emitter.emit(builder.setMetric(
134+
"ingest/persists/cpu",
135+
metrics.persistCpuTime() - previousSegmentGenerationMetrics.persistCpuTime()
136+
));
117137
emitter.emit(
118138
builder.setMetric(
119139
"ingest/persists/backPressure",
120140
metrics.persistBackPressureMillis() - previousSegmentGenerationMetrics.persistBackPressureMillis()
121141
)
122142
);
123-
emitter.emit(builder.setMetric("ingest/persists/failed", metrics.failedPersists() - previousSegmentGenerationMetrics.failedPersists()));
124-
emitter.emit(builder.setMetric("ingest/handoff/failed", metrics.failedHandoffs() - previousSegmentGenerationMetrics.failedHandoffs()));
125-
emitter.emit(builder.setMetric("ingest/merge/time", metrics.mergeTimeMillis() - previousSegmentGenerationMetrics.mergeTimeMillis()));
126-
emitter.emit(builder.setMetric("ingest/merge/cpu", metrics.mergeCpuTime() - previousSegmentGenerationMetrics.mergeCpuTime()));
127-
emitter.emit(builder.setMetric("ingest/handoff/count", metrics.handOffCount() - previousSegmentGenerationMetrics.handOffCount()));
143+
emitter.emit(builder.setMetric(
144+
"ingest/persists/failed",
145+
metrics.failedPersists() - previousSegmentGenerationMetrics.failedPersists()
146+
));
147+
emitter.emit(builder.setMetric(
148+
"ingest/handoff/failed",
149+
metrics.failedHandoffs() - previousSegmentGenerationMetrics.failedHandoffs()
150+
));
151+
emitter.emit(builder.setMetric(
152+
"ingest/merge/time",
153+
metrics.mergeTimeMillis() - previousSegmentGenerationMetrics.mergeTimeMillis()
154+
));
155+
emitter.emit(builder.setMetric(
156+
"ingest/merge/cpu",
157+
metrics.mergeCpuTime() - previousSegmentGenerationMetrics.mergeCpuTime()
158+
));
159+
emitter.emit(builder.setMetric(
160+
"ingest/handoff/count",
161+
metrics.handOffCount() - previousSegmentGenerationMetrics.handOffCount()
162+
));
128163
emitter.emit(builder.setMetric("ingest/sink/count", metrics.sinkCount()));
129164

130-
long messageGap = metrics.messageGap();
165+
final long messageGap = metrics.messageGap();
131166
if (messageGap >= 0) {
132167
emitter.emit(builder.setMetric("ingest/events/messageGap", messageGap));
133168
}
169+
final long numMessageGapEvent = metrics.numMessageGapEvent();
170+
if (numMessageGapEvent > 0) {
171+
emitter.emit(builder.setMetric("ingest/events/messageGapAvg", metrics.messageGapAvg()));
172+
emitter.emit(builder.setMetric("ingest/events/minMessageGap", metrics.maxMessageGap()));
173+
emitter.emit(builder.setMetric("ingest/events/maxMessageGap", metrics.minMessageGap()));
174+
}
134175

135176
long maxSegmentHandoffTime = metrics.maxSegmentHandoffTime();
136177
if (maxSegmentHandoffTime >= 0) {

server/src/main/java/org/apache/druid/segment/realtime/SegmentGenerationMetrics.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public class SegmentGenerationMetrics
5353
private final AtomicLong messageGap = new AtomicLong(0);
5454
private final AtomicBoolean processingDone = new AtomicBoolean(false);
5555

56+
// Message gap accounting
57+
private final AtomicLong minMessageGap = new AtomicLong(Long.MAX_VALUE);
58+
private final AtomicLong maxMessageGap = new AtomicLong(0);
59+
private final AtomicLong messageGapAvg = new AtomicLong(0); // millisecond-level precision
60+
private final AtomicLong numMessageGapEvents = new AtomicLong(0);
61+
5662
private final AtomicLong maxSegmentHandoffTime = new AtomicLong(NO_EMIT_SEGMENT_HANDOFF_TIME);
5763

5864
public void incrementRowOutputCount(long numRows)
@@ -110,6 +116,14 @@ public void reportMessageMaxTimestamp(long messageMaxTimestamp)
110116
this.messageMaxTimestamp.set(Math.max(messageMaxTimestamp, this.messageMaxTimestamp.get()));
111117
}
112118

119+
public void reportMessageGapAggregate(long messageGap)
120+
{
121+
final long newNumEvents = this.numMessageGapEvents.incrementAndGet();
122+
this.minMessageGap.getAndAccumulate(messageGap, Math::min);
123+
this.maxMessageGap.getAndAccumulate(messageGap, Math::max);
124+
this.messageGapAvg.getAndUpdate(oldAvg -> oldAvg + ((messageGap - oldAvg) / newNumEvents));
125+
}
126+
113127
public void reportMaxSegmentHandoffTime(long maxSegmentHandoffTime)
114128
{
115129
this.maxSegmentHandoffTime.set(Math.max(maxSegmentHandoffTime, this.maxSegmentHandoffTime.get()));
@@ -170,6 +184,7 @@ public long pushedRows()
170184
{
171185
return pushedRows.get();
172186
}
187+
173188
public long mergeTimeMillis()
174189
{
175190
return mergeTimeMillis.get();
@@ -195,6 +210,26 @@ public long sinkCount()
195210
return sinkCount.get();
196211
}
197212

213+
public long maxMessageGap()
214+
{
215+
return maxMessageGap.get();
216+
}
217+
218+
public long minMessageGap()
219+
{
220+
return minMessageGap.get();
221+
}
222+
223+
public long messageGapAvg()
224+
{
225+
return messageGapAvg.get();
226+
}
227+
228+
public long numMessageGapEvent()
229+
{
230+
return numMessageGapEvents.get();
231+
}
232+
198233
public long messageGap()
199234
{
200235
return messageGap.get();
@@ -219,8 +254,8 @@ public SegmentGenerationMetrics snapshot()
219254
retVal.mergeCpuTime.set(mergeCpuTime.get());
220255
retVal.persistCpuTime.set(persistCpuTime.get());
221256
retVal.handOffCount.set(handOffCount.get());
222-
retVal.sinkCount.set(sinkCount.get());
223257
retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
258+
retVal.sinkCount.set(sinkCount.get());
224259
retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
225260
retVal.mergedRows.set(mergedRows.get());
226261
retVal.pushedRows.set(pushedRows.get());
@@ -233,6 +268,10 @@ public SegmentGenerationMetrics snapshot()
233268
messageGapSnapshot = System.currentTimeMillis() - maxTimestamp;
234269
}
235270
retVal.messageGap.set(messageGapSnapshot);
271+
retVal.numMessageGapEvents.set(numMessageGapEvents.get());
272+
retVal.minMessageGap.set(minMessageGap.get());
273+
retVal.maxMessageGap.set(maxMessageGap.get());
274+
retVal.messageGapAvg.set(messageGapAvg.get());
236275

237276
reset();
238277

@@ -242,5 +281,9 @@ public SegmentGenerationMetrics snapshot()
242281
private void reset()
243282
{
244283
maxSegmentHandoffTime.set(NO_EMIT_SEGMENT_HANDOFF_TIME);
284+
numMessageGapEvents.set(0);
285+
messageGapAvg.set(0);
286+
minMessageGap.set(Long.MAX_VALUE);
287+
maxMessageGap.set(0);
245288
}
246289
}

server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java

+18
Original file line numberDiff line numberDiff line change
@@ -76,4 +76,22 @@ default int getMaxColumnsToMerge()
7676
{
7777
return -1;
7878
}
79+
80+
/**
81+
* Number of rows to skip when taking message gap aggregate samples.
82+
* Has no effect unless `isAggregateMessageGapStatsEnabled` is true.
83+
*/
84+
default int getMessageGapRowSampleSize()
85+
{
86+
return 1_000;
87+
}
88+
89+
/**
90+
* Whether to capture aggregate message gap metrics.
91+
* Defaults to false.
92+
*/
93+
default boolean isAggregateMessageGapStatsEnabled()
94+
{
95+
return false;
96+
}
7997
}

server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java

+31-14
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,8 @@ public class BatchAppenderator implements Appenderator
121121
private final long maxBytesTuningConfig;
122122
private final boolean skipBytesInMemoryOverheadCheck;
123123
private final boolean useMaxMemoryEstimates;
124+
private final int messageGapRowSampleSize;
125+
private final boolean aggregateMessageGapStats;
124126

125127
private volatile ListeningExecutorService persistExecutor = null;
126128
private volatile ListeningExecutorService pushExecutor = null;
@@ -190,6 +192,8 @@ public class BatchAppenderator implements Appenderator
190192
skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
191193
maxPendingPersists = tuningConfig.getMaxPendingPersists();
192194
this.useMaxMemoryEstimates = useMaxMemoryEstimates;
195+
this.aggregateMessageGapStats = tuningConfig.isAggregateMessageGapStatsEnabled();
196+
this.messageGapRowSampleSize = tuningConfig.getMessageGapRowSampleSize();
193197
this.centralizedDatasourceSchemaConfig = centralizedDatasourceSchemaConfig;
194198
this.fingerprintGenerator = new FingerprintGenerator(objectMapper);
195199
}
@@ -287,6 +291,9 @@ public AppenderatorAddResult add(
287291
}
288292

289293
final Sink sink = getOrCreateSink(identifier);
294+
if (aggregateMessageGapStats && totalRows % messageGapRowSampleSize == 0) {
295+
metrics.reportMessageGapAggregate(System.currentTimeMillis() - row.getTimestampFromEpoch());
296+
}
290297
metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
291298
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
292299
final int sinkRowsInMemoryAfterAdd;
@@ -513,8 +520,10 @@ public void clear()
513520
private void clear(Map<SegmentIdWithShardSpec, Sink> sinksToClear, boolean removeOnDiskData)
514521
{
515522
// Drop commit metadata, then abandon all segments.
516-
log.info("Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]",
517-
sinksToClear.size(), removeOnDiskData);
523+
log.info(
524+
"Clearing all[%d] sinks & their hydrants, removing data on disk: [%s]",
525+
sinksToClear.size(), removeOnDiskData
526+
);
518527
// Drop everything.
519528
Iterator<Map.Entry<SegmentIdWithShardSpec, Sink>> sinksIterator = sinksToClear.entrySet().iterator();
520529
sinksIterator.forEachRemaining(entry -> {
@@ -582,8 +591,9 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
582591
// the invariant of exactly one, always swappable, sink with exactly one unpersisted hydrant must hold
583592
int totalHydrantsForSink = hydrants.size();
584593
if (totalHydrantsForSink != 1) {
585-
throw new ISE("There should be only one hydrant for identifier[%s] but there are[%s]",
586-
identifier, totalHydrantsForSink
594+
throw new ISE(
595+
"There should be only one hydrant for identifier[%s] but there are[%s]",
596+
identifier, totalHydrantsForSink
587597
);
588598
}
589599
totalHydrantsCount++;
@@ -635,8 +645,9 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
635645
metrics.incrementPersistTimeMillis(persistMillis);
636646
persistStopwatch.stop();
637647
// make sure no push can start while persisting:
638-
log.info("Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory in[%d] millis",
639-
numPersistedRows, bytesPersisted, persistMillis
648+
log.info(
649+
"Persisted rows[%,d] and bytes[%,d] and removed all sinks & hydrants from memory in[%d] millis",
650+
numPersistedRows, bytesPersisted, persistMillis
640651
);
641652
log.info("Persist is done.");
642653
}
@@ -656,6 +667,7 @@ public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
656667
/**
657668
* All sinks will be persisted so do a shallow copy of the Sinks map, reset
658669
* the map and metadata (i.e. memory consumption counters) so that ingestion can go on
670+
*
659671
* @return The map of sinks to persist, this map will be garbage collected after
660672
* persist is complete since we will not be keeping a reference to it...
661673
*/
@@ -740,8 +752,9 @@ public ListenableFuture<SegmentsAndCommitMetadata> push(
740752
log.warn("mergeAndPush[%s] returned null, skipping.", identifier);
741753
}
742754
}
743-
log.info("Push done: total sinks merged[%d], total hydrants merged[%d]",
744-
identifiers.size(), totalHydrantsMerged
755+
log.info(
756+
"Push done: total sinks merged[%d], total hydrants merged[%d]",
757+
identifiers.size(), totalHydrantsMerged
745758
);
746759
return new SegmentsAndCommitMetadata(dataSegments, commitMetadata, segmentSchemaMapping);
747760
},
@@ -753,8 +766,8 @@ public ListenableFuture<SegmentsAndCommitMetadata> push(
753766
/**
754767
* Merge segment, push to deep storage. Should only be used on segments that have been fully persisted.
755768
*
756-
* @param identifier sink identifier
757-
* @param sink sink to push
769+
* @param identifier sink identifier
770+
* @param sink sink to push
758771
* @return segment descriptor along with schema, or null if the sink is no longer valid
759772
*/
760773
private DataSegmentWithMetadata mergeAndPush(
@@ -784,8 +797,10 @@ private DataSegmentWithMetadata mergeAndPush(
784797
if (sm == null) {
785798
log.warn("Sink metadata not found just before merge for identifier [%s]", identifier);
786799
} else if (numHydrants != sm.getNumHydrants()) {
787-
throw new ISE("Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]",
788-
numHydrants, identifier, sm.getNumHydrants());
800+
throw new ISE(
801+
"Number of restored hydrants[%d] for identifier[%s] does not match expected value[%d]",
802+
numHydrants, identifier, sm.getNumHydrants()
803+
);
789804
}
790805

791806
try {
@@ -1260,12 +1275,14 @@ private int calculateSinkMemoryInUsed()
12601275
*/
12611276
private static class SinkMetadata
12621277
{
1263-
/** This is used to maintain the rows in the sink accross persists of the sink
1278+
/**
1279+
* This is used to maintain the rows in the sink accross persists of the sink
12641280
* used for functionality (i.e. to detect whether an incremental push
12651281
* is needed {@link AppenderatorDriverAddResult#isPushRequired(Integer, Long)}
12661282
**/
12671283
private int numRowsInSegment;
1268-
/** For sanity check as well as functionality: to make sure that all hydrants for a sink are restored from disk at
1284+
/**
1285+
* For sanity check as well as functionality: to make sure that all hydrants for a sink are restored from disk at
12691286
* push time and also to remember the fire hydrant "count" when persisting it.
12701287
*/
12711288
private int numHydrants;

0 commit comments

Comments
 (0)