Skip to content

Commit eeedd64

Browse files
authored
Introduce Default Replication Worker Performance Test Harness (#20956)
Introduce a performance test harness for the default replication worker to make it easy for devs to test effect of changes on platform throughput. The current set up is designed to be run manually. In the future, we can look into integrating this report into our build pipelines. For now, this is good enough as I wanted to start somewhere. The general idea is to use JMH to run the test n number of times (currently 4 times). The dev can then look at logs to see throughput and how it varies. As of this PR, we see general platform throughput of ~ 20 - 25 MB/s.
1 parent bb84fac commit eeedd64

File tree

8 files changed

+254
-1
lines changed

8 files changed

+254
-1
lines changed

airbyte-commons-worker/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,16 @@ dependencies {
3131

3232
testAnnotationProcessor platform(libs.micronaut.bom)
3333
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor
34+
testAnnotationProcessor libs.jmh.annotations
3435

3536
testImplementation libs.bundles.micronaut.test
3637
testImplementation 'com.jayway.jsonpath:json-path:2.7.0'
3738
testImplementation 'org.mockito:mockito-inline:4.7.0'
3839
testImplementation libs.postgresql
3940
testImplementation libs.platform.testcontainers
4041
testImplementation libs.platform.testcontainers.postgresql
42+
testImplementation libs.jmh.core
43+
testImplementation libs.jmh.annotations
4144

4245
testImplementation project(':airbyte-commons-docker')
4346
}

airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ private static Runnable readFromSrcAndWriteToDstRunnable(final AirbyteSource sou
309309
return () -> {
310310
MDC.setContextMap(mdc);
311311
LOGGER.info("Replication thread started.");
312-
Long recordsRead = 0L;
312+
long recordsRead = 0L;
313313
final Map<AirbyteStreamNameNamespacePair, ImmutablePair<Set<String>, Integer>> validationErrors = new HashMap<>();
314314
final Map<AirbyteStreamNameNamespacePair, List<String>> streamToSelectedFields = new HashMap<>();
315315
if (fieldSelectionEnabled) {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.general;
6+
7+
import io.airbyte.config.WorkerDestinationConfig;
8+
import io.airbyte.protocol.models.AirbyteMessage;
9+
import io.airbyte.workers.internal.AirbyteDestination;
10+
import java.nio.file.Path;
11+
import java.util.Optional;
12+
13+
/**
14+
* Empty Airbyte Destination. Does nothing with messages. Intended for performance testing.
15+
*/
16+
public class EmptyAirbyteDestination implements AirbyteDestination {
17+
18+
@Override
19+
public void start(WorkerDestinationConfig destinationConfig, Path jobRoot) throws Exception {
20+
21+
}
22+
23+
@Override
24+
public void accept(AirbyteMessage message) throws Exception {
25+
26+
}
27+
28+
@Override
29+
public void notifyEndOfInput() throws Exception {
30+
31+
}
32+
33+
@Override
34+
public boolean isFinished() {
35+
return true;
36+
}
37+
38+
@Override
39+
public int getExitValue() {
40+
return 0;
41+
}
42+
43+
@Override
44+
public Optional<AirbyteMessage> attemptRead() {
45+
return Optional.empty();
46+
}
47+
48+
@Override
49+
public void close() throws Exception {}
50+
51+
@Override
52+
public void cancel() throws Exception {
53+
54+
}
55+
56+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.general;
6+
7+
import io.airbyte.config.WorkerSourceConfig;
8+
import io.airbyte.protocol.models.AirbyteMessage;
9+
import io.airbyte.workers.internal.AirbyteSource;
10+
import io.airbyte.workers.test_utils.AirbyteMessageUtils;
11+
import java.nio.file.Path;
12+
import java.util.Optional;
13+
14+
/**
15+
* Basic Airbyte Source that emits {@link LimitedAirbyteSource#TOTAL_RECORDS} before finishing.
16+
* Intended for performance testing.
17+
*/
18+
public class LimitedAirbyteSource implements AirbyteSource {
19+
20+
private static final int TOTAL_RECORDS = 1_000_000;
21+
22+
private int currentRecords = 0;
23+
24+
@Override
25+
public void start(WorkerSourceConfig sourceConfig, Path jobRoot) throws Exception {
26+
27+
}
28+
29+
@Override
30+
public boolean isFinished() {
31+
return currentRecords == TOTAL_RECORDS;
32+
}
33+
34+
@Override
35+
public int getExitValue() {
36+
return 0;
37+
}
38+
39+
@Override
40+
public Optional<AirbyteMessage> attemptRead() {
41+
currentRecords++;
42+
return Optional.of(AirbyteMessageUtils.createRecordMessage("s1", "data",
43+
"This is a fairly long sentence to provide some bytes here. More bytes is better as it helps us measure performance."
44+
+ "Random append to prevent dead code generation: " + currentRecords));
45+
}
46+
47+
@Override
48+
public void close() throws Exception {
49+
50+
}
51+
52+
@Override
53+
public void cancel() throws Exception {
54+
55+
}
56+
57+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.general;
6+
7+
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
8+
import io.airbyte.config.ReplicationOutput;
9+
import io.airbyte.config.StandardSyncInput;
10+
import io.airbyte.metrics.lib.NotImplementedMetricClient;
11+
import io.airbyte.protocol.models.AirbyteStream;
12+
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
13+
import io.airbyte.protocol.models.CatalogHelpers;
14+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
15+
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
16+
import io.airbyte.protocol.models.JsonSchemaType;
17+
import io.airbyte.protocol.models.SyncMode;
18+
import io.airbyte.workers.RecordSchemaValidator;
19+
import io.airbyte.workers.WorkerMetricReporter;
20+
import io.airbyte.workers.exception.WorkerException;
21+
import io.airbyte.workers.internal.NamespacingMapper;
22+
import io.airbyte.workers.internal.book_keeping.AirbyteMessageTracker;
23+
import java.io.IOException;
24+
import java.nio.file.Path;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.concurrent.atomic.AtomicReference;
28+
import lombok.extern.slf4j.Slf4j;
29+
import org.openjdk.jmh.annotations.Benchmark;
30+
import org.openjdk.jmh.annotations.BenchmarkMode;
31+
import org.openjdk.jmh.annotations.Fork;
32+
import org.openjdk.jmh.annotations.Measurement;
33+
import org.openjdk.jmh.annotations.Mode;
34+
import org.openjdk.jmh.annotations.Warmup;
35+
36+
@Slf4j
37+
public class ReplicationWorkerPerformanceTest {
38+
39+
/**
40+
* Hook up the DefaultReplicationWorker to a test harness with an insanely quick Source
41+
* {@link LimitedAirbyteSource} and Destination {@link EmptyAirbyteDestination}.
42+
* <p>
43+
* Harness uses Java Micro Benchmark to run the E2E sync a configured number of times. It then
44+
* reports a time distribution for the time taken to run the E2E sync.
45+
* <p>
46+
* Because the reported time does not explicitly include throughput numbers, throughput logging has
47+
* been added. This class is intended to help devs understand the impact of changes on throughput.
48+
* <p>
49+
* To use this, simply run the main method, make yourself a cup of coffee for 5 mins, then look the
50+
* logs.
51+
*/
52+
@Benchmark
53+
// SampleTime = the time taken to run the benchmarked method. Use this because we only care about
54+
// the time taken to sync the entire dataset.
55+
@BenchmarkMode(Mode.SampleTime)
56+
// Warming up the JVM stabilises results however takes longer. Skip this for now since we don't need
57+
// that fine a result.
58+
@Warmup(iterations = 0)
59+
// How many runs to do.
60+
@Fork(1)
61+
// Within each run, how many iterations to do.
62+
@Measurement(iterations = 2)
63+
public void executeOneSync() throws InterruptedException {
64+
final var perSource = new LimitedAirbyteSource();
65+
final var perDestination = new EmptyAirbyteDestination();
66+
final var messageTracker = new AirbyteMessageTracker();
67+
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
68+
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
69+
final var validator = new RecordSchemaValidator(Map.of(
70+
new AirbyteStreamNameNamespacePair("s1", null),
71+
CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING))));
72+
73+
final var worker = new DefaultReplicationWorker("1", 0,
74+
perSource,
75+
dstNamespaceMapper,
76+
perDestination,
77+
messageTracker,
78+
validator,
79+
metricReporter,
80+
false);
81+
final AtomicReference<ReplicationOutput> output = new AtomicReference<>();
82+
final Thread workerThread = new Thread(() -> {
83+
try {
84+
output.set(worker.run(new StandardSyncInput().withCatalog(new ConfiguredAirbyteCatalog()
85+
.withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("s1"))))),
86+
Path.of("/")));
87+
} catch (final WorkerException e) {
88+
throw new RuntimeException(e);
89+
}
90+
});
91+
92+
workerThread.start();
93+
workerThread.join();
94+
final var summary = output.get().getReplicationAttemptSummary();
95+
final var mbRead = summary.getBytesSynced() / 1_000_000;
96+
final var timeTakenSec = (summary.getEndTime() - summary.getStartTime()) / 1000.0;
97+
log.info("MBs read: {}, Time taken sec: {}, MB/s: {}", mbRead, timeTakenSec, mbRead / timeTakenSec);
98+
}
99+
100+
public static void main(String[] args) throws IOException {
101+
// Run this main class to start benchmarking.
102+
org.openjdk.jmh.Main.main(args);
103+
}
104+
105+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.general;
6+
7+
import io.airbyte.protocol.models.AirbyteMessage;
8+
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
9+
import io.airbyte.workers.internal.AirbyteMapper;
10+
11+
/**
12+
* Stub mapper testing what happens without any mapping.
13+
*/
14+
public class StubAirbyteMapper implements AirbyteMapper {
15+
16+
@Override
17+
public ConfiguredAirbyteCatalog mapCatalog(ConfiguredAirbyteCatalog catalog) {
18+
return null;
19+
}
20+
21+
@Override
22+
public AirbyteMessage mapMessage(AirbyteMessage message) {
23+
return message;
24+
}
25+
26+
}

deps.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ fasterxml_version = "2.13.3"
1515
flyway = "7.14.0"
1616
glassfish_version = "2.31"
1717
hikaricp = "5.0.1"
18+
jmh = "1.36"
1819
jooq = "3.13.4"
1920
junit-jupiter = "5.9.0"
2021
log4j = "2.17.2"
@@ -68,6 +69,8 @@ jackson-datatype = { module = "com.fasterxml.jackson.datatype:jackson-datatype-j
6869
java-dogstatsd-client = { module = "com.datadoghq:java-dogstatsd-client", version = "4.1.0" }
6970
javax-databind = { module = "javax.xml.bind:jaxb-api", version = "2.4.0-b180830.0359" }
7071
jcl-over-slf4j = { module = "org.slf4j:jcl-over-slf4j", version.ref = "slf4j" }
72+
jmh-core = { module = "org.openjdk.jmh:jmh-core", version.ref = "jmh" }
73+
jmh-annotations = { module = "org.openjdk.jmh:jmh-generator-annprocess", version.ref = "jmh" }
7174
jooq = { module = "org.jooq:jooq", version.ref = "jooq" }
7275
jooq-codegen = { module = "org.jooq:jooq-codegen", version.ref = "jooq" }
7376
jooq-meta = { module = "org.jooq:jooq-meta", version.ref = "jooq" }

spotbugs-exclude-filter-file.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
<Match>
99
<Source name="~.*build/generated.*" />
1010
</Match>
11+
<Match>
12+
<Package name="io.airbyte.workers.general.jmh_generated.*" />
13+
</Match>
1114
<Match>
1215
<Package name="io.airbyte.api.client.*" />
1316
</Match>

0 commit comments

Comments
 (0)