Skip to content

Commit f338e47

Browse files
authored
Add additional sync timing information (#17643)
* WIP - Add additional sync timing information * Fixup tests * fix PMD problem * send data to segment * Test JobTracker * respond to PR suggestions * fixup test * formatting * fix initializer for stats * Make thread-safe with synchronized * Don't clobber syncStats on init * add comments and fix init * Do what Pedro says * Extract timeTracker pojo
1 parent 289a065 commit f338e47

File tree

8 files changed

+189
-34
lines changed

8 files changed

+189
-34
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import io.airbyte.workers.exception.RecordSchemaValidationException;
2626
import io.airbyte.workers.exception.WorkerException;
2727
import io.airbyte.workers.helper.FailureHelper;
28+
import io.airbyte.workers.helper.ThreadedTimeTracker;
2829
import io.airbyte.workers.internal.AirbyteDestination;
2930
import io.airbyte.workers.internal.AirbyteMapper;
3031
import io.airbyte.workers.internal.AirbyteSource;
@@ -129,7 +130,10 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
129130
final WorkerDestinationConfig destinationConfig = WorkerUtils.syncToWorkerDestinationConfig(syncInput);
130131
destinationConfig.setCatalog(mapper.mapCatalog(destinationConfig.getCatalog()));
131132

133+
final ThreadedTimeTracker timeTracker = new ThreadedTimeTracker();
132134
final long startTime = System.currentTimeMillis();
135+
timeTracker.trackReplicationStartTime();
136+
133137
final AtomicReference<FailureReason> replicationRunnableFailureRef = new AtomicReference<>();
134138
final AtomicReference<FailureReason> destinationRunnableFailureRef = new AtomicReference<>();
135139

@@ -146,12 +150,14 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
146150
// closed first (which is what we want).
147151
try (destination; source) {
148152
destination.start(destinationConfig, jobRoot);
153+
timeTracker.trackSourceReadStartTime();
149154
source.start(sourceConfig, jobRoot);
155+
timeTracker.trackDestinationWriteStartTime();
150156

151157
// note: `whenComplete` is used instead of `exceptionally` so that the original exception is still
152158
// thrown
153159
final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
154-
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
160+
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc, timeTracker),
155161
executors).whenComplete((msg, ex) -> {
156162
if (ex != null) {
157163
if (ex.getCause() instanceof DestinationException) {
@@ -163,7 +169,7 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
163169
});
164170

165171
final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
166-
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter),
172+
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker),
167173
executors).whenComplete((msg, ex) -> {
168174
if (ex != null) {
169175
if (ex.getCause() instanceof SourceException) {
@@ -204,6 +210,8 @@ else if (hasFailed.get()) {
204210
outputStatus = ReplicationStatus.COMPLETED;
205211
}
206212

213+
timeTracker.trackReplicationEndTime();
214+
207215
final SyncStats totalSyncStats = new SyncStats()
208216
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
209217
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
@@ -212,7 +220,13 @@ else if (hasFailed.get()) {
212220
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
213221
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage())
214222
.withMaxSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
215-
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null));
223+
.withMeanSecondsBetweenStateMessageEmittedandCommitted(messageTracker.getMeanSecondsBetweenStateMessageEmittedAndCommitted().orElse(null))
224+
.withReplicationStartTime(timeTracker.getReplicationStartTime())
225+
.withReplicationEndTime(timeTracker.getReplicationEndTime())
226+
.withSourceReadStartTime(timeTracker.getSourceReadStartTime())
227+
.withSourceReadEndTime(timeTracker.getSourceReadEndTime())
228+
.withDestinationWriteStartTime(timeTracker.getDestinationWriteStartTime())
229+
.withDestinationWriteEndTime(timeTracker.getDestinationWriteEndTime());
216230

217231
if (outputStatus == ReplicationStatus.COMPLETED) {
218232
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
@@ -318,7 +332,8 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
318332
final MessageTracker messageTracker,
319333
final Map<String, String> mdc,
320334
final RecordSchemaValidator recordSchemaValidator,
321-
final WorkerMetricReporter metricReporter) {
335+
final WorkerMetricReporter metricReporter,
336+
final ThreadedTimeTracker timeHolder) {
322337
return () -> {
323338
MDC.setContextMap(mdc);
324339
LOGGER.info("Replication thread started.");
@@ -362,6 +377,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
362377
}
363378
}
364379
}
380+
timeHolder.trackSourceReadEndTime();
365381
LOGGER.info("Total records read: {} ({})", recordsRead, FileUtils.byteCountToDisplaySize(messageTracker.getTotalBytesEmitted()));
366382
if (!validationErrors.isEmpty()) {
367383
validationErrors.forEach((stream, errorPair) -> {
@@ -431,7 +447,8 @@ private static void validateSchema(final RecordSchemaValidator recordSchemaValid
431447
private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
432448
final AtomicBoolean cancelled,
433449
final MessageTracker messageTracker,
434-
final Map<String, String> mdc) {
450+
final Map<String, String> mdc,
451+
final ThreadedTimeTracker timeHolder) {
435452
return () -> {
436453
MDC.setContextMap(mdc);
437454
LOGGER.info("Destination output thread started.");
@@ -448,6 +465,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
448465
messageTracker.acceptFromDestination(messageOptional.get());
449466
}
450467
}
468+
timeHolder.trackDestinationWriteEndTime();
451469
if (!cancelled.get() && destination.getExitValue() != 0) {
452470
throw new DestinationException("Destination process exited with non-zero exit code " + destination.getExitValue());
453471
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.workers.helper;
6+
7+
/**
8+
* This class exists to track timing information for the sync. It needs to be thread-safe as
9+
* multiple threads (source, destination, and worker) will be accessing it.
10+
*/
11+
public class ThreadedTimeTracker {
12+
13+
private long replicationStartTime;
14+
private long replicationEndTime;
15+
private long sourceReadStartTime;
16+
private long sourceReadEndTime;
17+
private long destinationWriteStartTime;
18+
private long destinationWriteEndTime;
19+
20+
public synchronized void trackReplicationStartTime() {
21+
this.replicationStartTime = System.currentTimeMillis();
22+
}
23+
24+
public synchronized void trackReplicationEndTime() {
25+
this.replicationEndTime = System.currentTimeMillis();
26+
}
27+
28+
public synchronized void trackSourceReadStartTime() {
29+
this.sourceReadStartTime = System.currentTimeMillis();
30+
}
31+
32+
public synchronized void trackSourceReadEndTime() {
33+
this.sourceReadEndTime = System.currentTimeMillis();
34+
}
35+
36+
public synchronized void trackDestinationWriteStartTime() {
37+
this.destinationWriteStartTime = System.currentTimeMillis();
38+
}
39+
40+
public synchronized void trackDestinationWriteEndTime() {
41+
this.destinationWriteEndTime = System.currentTimeMillis();
42+
}
43+
44+
public synchronized long getReplicationStartTime() {
45+
return this.replicationStartTime;
46+
}
47+
48+
public synchronized long getReplicationEndTime() {
49+
return this.replicationEndTime;
50+
}
51+
52+
public synchronized long getSourceReadStartTime() {
53+
return this.sourceReadStartTime;
54+
}
55+
56+
public synchronized long getSourceReadEndTime() {
57+
return this.sourceReadEndTime;
58+
}
59+
60+
public synchronized long getDestinationWriteStartTime() {
61+
return this.destinationWriteStartTime;
62+
}
63+
64+
public synchronized long getDestinationWriteEndTime() {
65+
return this.destinationWriteEndTime;
66+
}
67+
68+
}

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -536,9 +536,11 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
536536
Jsons.jsonNode(actual));
537537
assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ","));
538538

539-
// remove times so we can do the rest of the object <> object comparison.
540-
actual.getReplicationAttemptSummary().withStartTime(null);
541-
actual.getReplicationAttemptSummary().withEndTime(null);
539+
// remove times, so we can do the rest of the object <> object comparison.
540+
actual.getReplicationAttemptSummary().withStartTime(null).withEndTime(null).getTotalStats().withReplicationStartTime(null)
541+
.withReplicationEndTime(null)
542+
.withSourceReadStartTime(null).withSourceReadEndTime(null)
543+
.withDestinationWriteStartTime(null).withDestinationWriteEndTime(null);
542544

543545
assertEquals(replicationOutput, actual);
544546
}
@@ -631,7 +633,9 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
631633
.withDestinationStateMessagesEmitted(null)));
632634

633635
assertNotNull(actual);
634-
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats());
636+
// null out timing stats for assertion matching
637+
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats().withReplicationStartTime(null).withReplicationEndTime(null)
638+
.withSourceReadStartTime(null).withSourceReadEndTime(null).withDestinationWriteStartTime(null).withDestinationWriteEndTime(null));
635639
assertEquals(expectedStreamStats, actual.getReplicationAttemptSummary().getStreamStats());
636640
}
637641

airbyte-config/config-models/src/main/resources/types/SyncStats.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,21 @@ properties:
2929
type: integer
3030
meanSecondsBetweenStateMessageEmittedandCommitted:
3131
type: integer
32+
replicationStartTime:
33+
type: integer
34+
description: The start of the replication activity
35+
replicationEndTime:
36+
type: integer
37+
description: The end of the replication activity
38+
sourceReadStartTime:
39+
type: integer
40+
description: The boot time of the source container/pod
41+
sourceReadEndTime:
42+
type: integer
43+
description: The exit time of the source container/pod
44+
destinationWriteStartTime:
45+
type: integer
46+
description: The boot time of the destination container/pod
47+
destinationWriteEndTime:
48+
type: integer
49+
description: The exit time of the destination container/pod

airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/tracker/TrackingMetadata.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@
1212
import io.airbyte.config.AttemptFailureSummary;
1313
import io.airbyte.config.FailureReason;
1414
import io.airbyte.config.JobOutput;
15+
import io.airbyte.config.NormalizationSummary;
1516
import io.airbyte.config.ResourceRequirements;
1617
import io.airbyte.config.ScheduleData;
1718
import io.airbyte.config.StandardDestinationDefinition;
1819
import io.airbyte.config.StandardSourceDefinition;
1920
import io.airbyte.config.StandardSync;
2021
import io.airbyte.config.StandardSync.ScheduleType;
2122
import io.airbyte.config.StandardSyncSummary;
23+
import io.airbyte.config.SyncStats;
2224
import io.airbyte.config.helpers.ScheduleHelpers;
2325
import io.airbyte.persistence.job.models.Attempt;
2426
import io.airbyte.persistence.job.models.Job;
@@ -110,6 +112,9 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
110112
final JobOutput jobOutput = lastAttempt.getOutput().get();
111113
if (jobOutput.getSync() != null) {
112114
final StandardSyncSummary syncSummary = jobOutput.getSync().getStandardSyncSummary();
115+
final SyncStats totalStats = syncSummary.getTotalStats();
116+
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();
117+
113118
if (syncSummary.getStartTime() != null)
114119
metadata.put("sync_start_time", syncSummary.getStartTime());
115120
if (syncSummary.getEndTime() != null && syncSummary.getStartTime() != null)
@@ -118,22 +123,42 @@ public static Map<String, Object> generateJobAttemptMetadata(final Job job) {
118123
metadata.put("volume_mb", syncSummary.getBytesSynced());
119124
if (syncSummary.getRecordsSynced() != null)
120125
metadata.put("volume_rows", syncSummary.getRecordsSynced());
121-
if (syncSummary.getTotalStats().getSourceStateMessagesEmitted() != null)
126+
if (totalStats.getSourceStateMessagesEmitted() != null)
122127
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
123-
if (syncSummary.getTotalStats().getDestinationStateMessagesEmitted() != null)
128+
if (totalStats.getDestinationStateMessagesEmitted() != null)
124129
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
125-
if (syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted() != null)
130+
if (totalStats.getMaxSecondsBeforeSourceStateMessageEmitted() != null)
126131
metadata.put("max_seconds_before_source_state_message_emitted",
127-
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
128-
if (syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted() != null)
132+
totalStats.getMaxSecondsBeforeSourceStateMessageEmitted());
133+
if (totalStats.getMeanSecondsBeforeSourceStateMessageEmitted() != null)
129134
metadata.put("mean_seconds_before_source_state_message_emitted",
130-
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
131-
if (syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
135+
totalStats.getMeanSecondsBeforeSourceStateMessageEmitted());
136+
if (totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted() != null)
132137
metadata.put("max_seconds_between_state_message_emit_and_commit",
133-
syncSummary.getTotalStats().getMaxSecondsBetweenStateMessageEmittedandCommitted());
134-
if (syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
138+
totalStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
139+
if (totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted() != null)
135140
metadata.put("mean_seconds_between_state_message_emit_and_commit",
136-
syncSummary.getTotalStats().getMeanSecondsBetweenStateMessageEmittedandCommitted());
141+
totalStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());
142+
143+
if (totalStats.getReplicationStartTime() != null)
144+
metadata.put("replication_start_time", totalStats.getReplicationStartTime());
145+
if (totalStats.getReplicationEndTime() != null)
146+
metadata.put("replication_end_time", totalStats.getReplicationEndTime());
147+
if (totalStats.getSourceReadStartTime() != null)
148+
metadata.put("source_read_start_time", totalStats.getSourceReadStartTime());
149+
if (totalStats.getSourceReadEndTime() != null)
150+
metadata.put("source_read_end_time", totalStats.getSourceReadEndTime());
151+
if (totalStats.getDestinationWriteStartTime() != null)
152+
metadata.put("destination_write_start_time", totalStats.getDestinationWriteStartTime());
153+
if (totalStats.getDestinationWriteEndTime() != null)
154+
metadata.put("destination_write_end_time", totalStats.getDestinationWriteEndTime());
155+
156+
if (normalizationSummary != null) {
157+
if (normalizationSummary.getStartTime() != null)
158+
metadata.put("normalization_start_time", normalizationSummary.getStartTime());
159+
if (normalizationSummary.getEndTime() != null)
160+
metadata.put("normalization_end_time", normalizationSummary.getEndTime());
161+
}
137162
}
138163
}
139164

airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.airbyte.config.JobSyncConfig;
2828
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
2929
import io.airbyte.config.Metadata;
30+
import io.airbyte.config.NormalizationSummary;
3031
import io.airbyte.config.Schedule;
3132
import io.airbyte.config.Schedule.TimeUnit;
3233
import io.airbyte.config.StandardCheckConnectionOutput;
@@ -124,6 +125,14 @@ class JobTrackerTest {
124125
.put("mean_seconds_before_source_state_message_emitted", 4L)
125126
.put("max_seconds_between_state_message_emit_and_commit", 7L)
126127
.put("mean_seconds_between_state_message_emit_and_commit", 6L)
128+
.put("replication_start_time", 7L)
129+
.put("replication_end_time", 8L)
130+
.put("source_read_start_time", 9L)
131+
.put("source_read_end_time", 10L)
132+
.put("destination_write_start_time", 11L)
133+
.put("destination_write_end_time", 12L)
134+
.put("normalization_start_time", 13L)
135+
.put("normalization_end_time", 14L)
127136
.build();
128137
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
129138
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
@@ -566,13 +575,15 @@ private Attempt getAttemptMock() {
566575
final JobOutput jobOutput = mock(JobOutput.class);
567576
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
568577
final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class);
578+
final NormalizationSummary normalizationSummary = mock(NormalizationSummary.class);
569579
final SyncStats syncStats = mock(SyncStats.class);
570580

571581
when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME);
572582
when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME);
573583
when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC);
574584
when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC);
575585
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
586+
when(syncOutput.getNormalizationSummary()).thenReturn(normalizationSummary);
576587
when(syncSummary.getTotalStats()).thenReturn(syncStats);
577588
when(jobOutput.getSync()).thenReturn(syncOutput);
578589
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
@@ -582,6 +593,15 @@ private Attempt getAttemptMock() {
582593
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
583594
when(syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(7L);
584595
when(syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted()).thenReturn(6L);
596+
when(syncStats.getReplicationStartTime()).thenReturn(7L);
597+
when(syncStats.getReplicationEndTime()).thenReturn(8L);
598+
when(syncStats.getSourceReadStartTime()).thenReturn(9L);
599+
when(syncStats.getSourceReadEndTime()).thenReturn(10L);
600+
when(syncStats.getDestinationWriteStartTime()).thenReturn(11L);
601+
when(syncStats.getDestinationWriteEndTime()).thenReturn(12L);
602+
when(normalizationSummary.getStartTime()).thenReturn(13L);
603+
when(normalizationSummary.getEndTime()).thenReturn(14L);
604+
585605
return attempt;
586606
}
587607

0 commit comments

Comments
 (0)