Skip to content

Commit f85c609

Browse files
Add message gap aggregate statistics for stream appenderators
1 parent 252cee5 commit f85c609

File tree

46 files changed

+809
-161
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+809
-161
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import com.google.common.collect.ImmutableList;
23+
import com.google.common.collect.ImmutableMap;
24+
import org.apache.druid.data.input.InputRow;
25+
import org.apache.druid.data.input.MapBasedInputRow;
26+
import org.apache.druid.java.util.common.DateTimes;
27+
import org.apache.druid.java.util.common.Intervals;
28+
import org.apache.druid.segment.realtime.appenderator.Appenderator;
29+
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
30+
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
31+
import org.apache.druid.timeline.partition.LinearShardSpec;
32+
import org.openjdk.jmh.annotations.BenchmarkMode;
33+
import org.openjdk.jmh.annotations.Fork;
34+
import org.openjdk.jmh.annotations.Level;
35+
import org.openjdk.jmh.annotations.Mode;
36+
import org.openjdk.jmh.annotations.OutputTimeUnit;
37+
import org.openjdk.jmh.annotations.Param;
38+
import org.openjdk.jmh.annotations.Scope;
39+
import org.openjdk.jmh.annotations.Setup;
40+
import org.openjdk.jmh.annotations.State;
41+
import org.openjdk.jmh.annotations.TearDown;
42+
43+
import java.io.File;
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.List;
47+
import java.util.concurrent.ThreadLocalRandom;
48+
import java.util.concurrent.TimeUnit;
49+
50+
@BenchmarkMode({Mode.AverageTime, Mode.Throughput})
51+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
52+
@State(Scope.Thread)
53+
@Fork(value = 1, jvmArgs = {
54+
"-Xms4G",
55+
"-Xmx4G",
56+
"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED"
57+
})
58+
public abstract class AppenderatorBenchmark
59+
{
60+
@Param({"10000"})
61+
protected int NUM_ROWS;
62+
63+
protected static final int UNIQUE_DIMS = 500;
64+
protected static final int NUM_SEGMENTS = 5;
65+
protected static final String SEGMENT_INTERVAL = "2020-01-01/2020-02-01";
66+
67+
protected Appenderator appenderator;
68+
protected List<SegmentIdWithShardSpec> identifiers;
69+
protected File tempDir;
70+
71+
// Pre-generated values
72+
protected long[] timestamps;
73+
protected String[] dimensionValues;
74+
protected int[] metricValues;
75+
76+
@Setup
77+
public void setup() throws IOException
78+
{
79+
tempDir = File.createTempFile("druid-appenderator-benchmark", "tmp");
80+
if (!tempDir.delete()) {
81+
throw new IOException("Could not delete appenderator benchmark temp dir");
82+
}
83+
if (!tempDir.mkdir()) {
84+
throw new IOException("Could not create appenderator benchmark temp dir");
85+
}
86+
87+
// Pre-generate test data
88+
timestamps = new long[NUM_ROWS];
89+
dimensionValues = new String[NUM_ROWS];
90+
metricValues = new int[NUM_ROWS];
91+
92+
final long startTimestamp = DateTimes.of("2020-01-01").getMillis();
93+
final long endTimestamp = DateTimes.of("2020-02-01").getMillis();
94+
final long timeRange = endTimestamp - startTimestamp - 1;
95+
96+
final ThreadLocalRandom random = ThreadLocalRandom.current();
97+
for (int i = 0; i < NUM_ROWS; ++i) {
98+
timestamps[i] = startTimestamp + random.nextLong(timeRange);
99+
dimensionValues[i] = "dim_" + (i % UNIQUE_DIMS);
100+
metricValues[i] = i;
101+
}
102+
103+
identifiers = new ArrayList<>();
104+
for (int i = 0; i < NUM_SEGMENTS; ++i) {
105+
identifiers.add(createSegmentId(SEGMENT_INTERVAL, "v1", i));
106+
}
107+
}
108+
109+
110+
protected static InputRow createRow(long timestamp, String dimValue, int metricValue)
111+
{
112+
return new MapBasedInputRow(
113+
timestamp,
114+
ImmutableList.of("dim"),
115+
ImmutableMap.of(
116+
"dim", dimValue,
117+
"met", metricValue
118+
)
119+
);
120+
}
121+
122+
protected static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
123+
{
124+
return new SegmentIdWithShardSpec(
125+
StreamAppenderatorTester.DATASOURCE,
126+
Intervals.of(interval),
127+
version,
128+
new LinearShardSpec(partitionNum)
129+
);
130+
}
131+
132+
@TearDown(Level.Trial)
133+
public void tearDown() throws Exception
134+
{
135+
if (appenderator != null) {
136+
appenderator.close();
137+
}
138+
cleanupDirs(tempDir);
139+
}
140+
141+
protected void cleanupDirs(File file)
142+
{
143+
if (file.isDirectory()) {
144+
final File[] children = file.listFiles();
145+
if (children != null) {
146+
for (File child : children) {
147+
cleanupDirs(child);
148+
}
149+
}
150+
}
151+
file.delete();
152+
}
153+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing;
21+
22+
import org.apache.druid.data.input.InputRow;
23+
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
24+
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
25+
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorTester;
26+
import org.apache.druid.segment.realtime.sink.Committers;
27+
import org.openjdk.jmh.annotations.Benchmark;
28+
import org.openjdk.jmh.annotations.BenchmarkMode;
29+
import org.openjdk.jmh.annotations.Measurement;
30+
import org.openjdk.jmh.annotations.Mode;
31+
import org.openjdk.jmh.annotations.Param;
32+
import org.openjdk.jmh.annotations.Setup;
33+
import org.openjdk.jmh.annotations.Warmup;
34+
import org.openjdk.jmh.infra.Blackhole;
35+
import org.openjdk.jmh.runner.Runner;
36+
import org.openjdk.jmh.runner.RunnerException;
37+
import org.openjdk.jmh.runner.options.Options;
38+
import org.openjdk.jmh.runner.options.OptionsBuilder;
39+
40+
import java.io.IOException;
41+
42+
@Warmup(iterations = 15)
43+
@Measurement(iterations = 15)
44+
public class StreamAppenderatorBenchmark extends AppenderatorBenchmark
45+
{
46+
@Param({"false", "true"})
47+
private boolean enableMessageGapAggStats;
48+
49+
@Setup
50+
@Override
51+
public void setup() throws IOException
52+
{
53+
super.setup();
54+
55+
final StreamAppenderatorTester tester = new StreamAppenderatorTester.Builder()
56+
.maxRowsInMemory(NUM_ROWS + 1) // keep in memory for now to keep times unbiased by disk access
57+
.maxSizeInBytes(100_000_000) // 100MB
58+
.basePersistDirectory(tempDir)
59+
.rowIngestionMeters(new SimpleRowIngestionMeters())
60+
.withMessageGapAggStatsEnabled(enableMessageGapAggStats)
61+
.build();
62+
63+
appenderator = tester.getAppenderator();
64+
appenderator.startJob();
65+
}
66+
67+
@Benchmark
68+
@BenchmarkMode(Mode.AverageTime)
69+
public void benchmarkAddRow(Blackhole blackhole) throws Exception
70+
{
71+
for (int i = 0; i < NUM_ROWS; ++i) {
72+
final InputRow row = createRow(timestamps[i], dimensionValues[i], metricValues[i]);
73+
74+
final SegmentIdWithShardSpec identifier = identifiers.get(i % identifiers.size());
75+
// note: disk flushes are disabled for this test to avoid variance in access latencies
76+
blackhole.consume(appenderator.add(identifier, row, Committers.nilSupplier(), false));
77+
}
78+
}
79+
80+
public static void main(String[] args) throws RunnerException
81+
{
82+
final Options opt = new OptionsBuilder()
83+
.include(StreamAppenderatorBenchmark.class.getSimpleName())
84+
.forks(1)
85+
.jvmArgs("-Xms4G", "-Xmx4G", "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED")
86+
.build();
87+
new Runner(opt).run();
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.benchmark.indexing.metrics;
21+
22+
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
23+
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.BenchmarkMode;
25+
import org.openjdk.jmh.annotations.Fork;
26+
import org.openjdk.jmh.annotations.Level;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Param;
31+
import org.openjdk.jmh.annotations.Scope;
32+
import org.openjdk.jmh.annotations.Setup;
33+
import org.openjdk.jmh.annotations.State;
34+
import org.openjdk.jmh.annotations.Warmup;
35+
import org.openjdk.jmh.runner.Runner;
36+
import org.openjdk.jmh.runner.RunnerException;
37+
import org.openjdk.jmh.runner.options.Options;
38+
import org.openjdk.jmh.runner.options.OptionsBuilder;
39+
import org.openjdk.jmh.runner.options.TimeValue;
40+
41+
import java.util.concurrent.ThreadLocalRandom;
42+
import java.util.concurrent.TimeUnit;
43+
44+
/**
45+
* Benchmark for SegmentGenerationMetrics focusing on reportMessageGap and reportMaxSegmentHandoffTime methods.
46+
*/
47+
@State(Scope.Benchmark)
48+
@Fork(value = 1)
49+
@Warmup(iterations = 1, time = 1)
50+
@Measurement(iterations = 20, time = 2)
51+
@BenchmarkMode({Mode.AverageTime})
52+
@OutputTimeUnit(TimeUnit.NANOSECONDS)
53+
public class SegmentGenerationMetricsBenchmark
54+
{
55+
private SegmentGenerationMetrics metrics;
56+
57+
@Param({"true", "false"})
58+
private boolean enableMessageGapMetrics;
59+
60+
// Pre-generated values for benchmarks
61+
private long messageGapValue;
62+
private long handoffTimeValue;
63+
64+
@Setup(Level.Iteration)
65+
public void setup()
66+
{
67+
metrics = new SegmentGenerationMetrics(enableMessageGapMetrics);
68+
69+
final ThreadLocalRandom random = ThreadLocalRandom.current();
70+
messageGapValue = random.nextLong(1, 10000);
71+
handoffTimeValue = random.nextLong(1, 5000);
72+
}
73+
74+
/**
75+
* Benchmark for the reportMessageGap in hot loop.
76+
*/
77+
@Benchmark
78+
public void benchmarkMultipleReportMessageGap()
79+
{
80+
metrics.reportMessageGapAggregates(messageGapValue);
81+
}
82+
83+
/**
84+
* Benchmark for reportMaxSegmentHandoffTime in hot loop.
85+
*/
86+
@Benchmark
87+
public void benchmarkMultipleReportMaxSegmentHandoffTime()
88+
{
89+
metrics.reportMaxSegmentHandoffTime(handoffTimeValue);
90+
}
91+
92+
93+
public static void main(String[] args) throws RunnerException
94+
{
95+
Options opt = new OptionsBuilder()
96+
.include(SegmentGenerationMetricsBenchmark.class.getSimpleName())
97+
.forks(1)
98+
.warmupIterations(1)
99+
.warmupTime(TimeValue.seconds(1))
100+
.measurementIterations(2)
101+
.measurementTime(TimeValue.seconds(2))
102+
.build();
103+
new Runner(opt).run();
104+
}
105+
}

0 commit comments

Comments
 (0)