Skip to content

Commit 1b1448d

Browse files
authored
Track Max & mean seconds to receive state messages (#15586)
* Segment tracking for max and mean seconds to receive state message from source
1 parent 2a2c164 commit 1b1448d

File tree

8 files changed

+98
-2
lines changed

8 files changed

+98
-2
lines changed

airbyte-config/config-models/src/main/resources/types/SyncStats.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,7 @@ properties:
2121
type: integer
2222
recordsCommitted:
2323
type: integer # if unset, committed records could not be computed
24+
meanSecondsBeforeSourceStateMessageEmitted:
25+
type: integer
26+
maxSecondsBeforeSourceStateMessageEmitted:
27+
type: integer

airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
111111
metadata.put("volume_rows", syncSummary.getRecordsSynced());
112112
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
113113
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
114+
metadata.put("max_seconds_before_source_state_message_emitted",
115+
syncSummary.getTotalStats().getMaxSecondsBeforeSourceStateMessageEmitted());
116+
metadata.put("mean_seconds_before_source_state_message_emitted",
117+
syncSummary.getTotalStats().getMeanSecondsBeforeSourceStateMessageEmitted());
114118
}
115119
}
116120

airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ class JobTrackerTest {
114114
.put("volume_mb", SYNC_BYTES_SYNC)
115115
.put("count_state_messages_from_source", 3L)
116116
.put("count_state_messages_from_destination", 1L)
117+
.put("max_seconds_before_source_state_message_emitted", 5L)
118+
.put("mean_seconds_before_source_state_message_emitted", 4L)
117119
.build();
118120
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
119121
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
@@ -496,6 +498,8 @@ private Attempt getAttemptMock() {
496498
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
497499
when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L);
498500
when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L);
501+
when(syncStats.getMaxSecondsBeforeSourceStateMessageEmitted()).thenReturn(5L);
502+
when(syncStats.getMeanSecondsBeforeSourceStateMessageEmitted()).thenReturn(4L);
499503
return attempt;
500504
}
501505

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,9 @@ else if (hasFailed.get()) {
202202
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
203203
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
204204
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
205-
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted());
205+
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted())
206+
.withMaxSecondsBeforeSourceStateMessageEmitted(messageTracker.getMaxSecondsToReceiveSourceStateMessage())
207+
.withMeanSecondsBeforeSourceStateMessageEmitted(messageTracker.getMeanSecondsToReceiveSourceStateMessage());
206208

207209
if (outputStatus == ReplicationStatus.COMPLETED) {
208210
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());

airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.util.concurrent.atomic.AtomicReference;
3131
import java.util.stream.Collectors;
3232
import lombok.extern.slf4j.Slf4j;
33+
import org.joda.time.DateTime;
34+
import org.joda.time.Seconds;
3335

3436
@Slf4j
3537
public class AirbyteMessageTracker implements MessageTracker {
@@ -40,6 +42,8 @@ public class AirbyteMessageTracker implements MessageTracker {
4042
private final AtomicReference<State> destinationOutputState;
4143
private final AtomicLong totalSourceEmittedStateMessages;
4244
private final AtomicLong totalDestinationEmittedStateMessages;
45+
private Long maxSecondsToReceiveSourceStateMessage;
46+
private Long meanSecondsToReceiveSourceStateMessage;
4347
private final Map<Short, Long> streamToRunningCount;
4448
private final HashFunction hashFunction;
4549
private final BiMap<String, Short> streamNameToIndex;
@@ -49,6 +53,8 @@ public class AirbyteMessageTracker implements MessageTracker {
4953
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
5054
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
5155
private final StateAggregator stateAggregator;
56+
private DateTime firstRecordReceivedAt;
57+
private final DateTime lastStateMessageReceivedAt;
5258

5359
private short nextStreamIndex;
5460

@@ -74,6 +80,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
7480
this.destinationOutputState = new AtomicReference<>();
7581
this.totalSourceEmittedStateMessages = new AtomicLong(0L);
7682
this.totalDestinationEmittedStateMessages = new AtomicLong(0L);
83+
this.maxSecondsToReceiveSourceStateMessage = 0L;
84+
this.meanSecondsToReceiveSourceStateMessage = 0L;
7785
this.streamToRunningCount = new HashMap<>();
7886
this.streamNameToIndex = HashBiMap.create();
7987
this.hashFunction = Hashing.murmur3_32_fixed();
@@ -85,6 +93,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final
8593
this.destinationErrorTraceMessages = new ArrayList<>();
8694
this.sourceErrorTraceMessages = new ArrayList<>();
8795
this.stateAggregator = stateAggregator;
96+
this.firstRecordReceivedAt = null;
97+
this.lastStateMessageReceivedAt = null;
8898
}
8999

90100
@Override
@@ -111,6 +121,10 @@ public void acceptFromDestination(final AirbyteMessage message) {
111121
* total byte count for the record's stream.
112122
*/
113123
private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage) {
124+
if (firstRecordReceivedAt == null) {
125+
firstRecordReceivedAt = DateTime.now();
126+
}
127+
114128
final short streamIndex = getStreamIndex(recordMessage.getStream());
115129

116130
final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L);
@@ -131,6 +145,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
131145
* correctly.
132146
*/
133147
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
148+
updateMaxAndMeanSecondsToReceiveStateMessage(DateTime.now());
134149
sourceOutputState.set(new State().withState(stateMessage.getData()));
135150
totalSourceEmittedStateMessages.incrementAndGet();
136151
final int stateHash = getStateHashCode(stateMessage);
@@ -327,4 +342,48 @@ public Long getTotalDestinationStateMessagesEmitted() {
327342
return totalDestinationEmittedStateMessages.get();
328343
}
329344

345+
@Override
346+
public Long getMaxSecondsToReceiveSourceStateMessage() {
347+
return maxSecondsToReceiveSourceStateMessage;
348+
}
349+
350+
@Override
351+
public Long getMeanSecondsToReceiveSourceStateMessage() {
352+
return meanSecondsToReceiveSourceStateMessage;
353+
}
354+
355+
private void updateMaxAndMeanSecondsToReceiveStateMessage(final DateTime stateMessageReceivedAt) {
356+
final Long secondsSinceLastStateMessage = calculateSecondsSinceLastStateEmitted(stateMessageReceivedAt);
357+
if (maxSecondsToReceiveSourceStateMessage < secondsSinceLastStateMessage) {
358+
maxSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
359+
}
360+
361+
if (meanSecondsToReceiveSourceStateMessage == 0) {
362+
meanSecondsToReceiveSourceStateMessage = secondsSinceLastStateMessage;
363+
} else {
364+
final Long newMeanSeconds =
365+
calculateMean(meanSecondsToReceiveSourceStateMessage, totalSourceEmittedStateMessages.get(), secondsSinceLastStateMessage);
366+
meanSecondsToReceiveSourceStateMessage = newMeanSeconds;
367+
}
368+
}
369+
370+
private Long calculateSecondsSinceLastStateEmitted(final DateTime stateMessageReceivedAt) {
371+
if (lastStateMessageReceivedAt != null) {
372+
return Long.valueOf(Seconds.secondsBetween(lastStateMessageReceivedAt, stateMessageReceivedAt).getSeconds());
373+
} else if (firstRecordReceivedAt != null) {
374+
return Long.valueOf(Seconds.secondsBetween(firstRecordReceivedAt, stateMessageReceivedAt).getSeconds());
375+
} else {
376+
// If we receive a State Message before a Record Message there is no previous timestamp to use for a
377+
// calculation
378+
return 0L;
379+
}
380+
}
381+
382+
@VisibleForTesting
383+
protected Long calculateMean(final Long currentMean, final Long totalCount, final Long newDataPoint) {
384+
final Long previousCount = totalCount - 1;
385+
final double result = (Double.valueOf(currentMean * previousCount) / totalCount) + (Double.valueOf(newDataPoint) / totalCount);
386+
return (long) result;
387+
}
388+
330389
}

airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ public interface MessageTracker {
106106

107107
Long getTotalDestinationStateMessagesEmitted();
108108

109+
Long getMaxSecondsToReceiveSourceStateMessage();
110+
111+
Long getMeanSecondsToReceiveSourceStateMessage();
112+
109113
AirbyteTraceMessage getFirstDestinationErrorTraceMessage();
110114

111115
AirbyteTraceMessage getFirstSourceErrorTraceMessage();

airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
445445
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
446446
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
447447
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
448+
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(5L);
449+
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(4L);
448450

449451
final ReplicationWorker worker = new DefaultReplicationWorker(
450452
JOB_ID,
@@ -467,6 +469,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
467469
.withBytesEmitted(100L)
468470
.withSourceStateMessagesEmitted(3L)
469471
.withDestinationStateMessagesEmitted(1L)
472+
.withMaxSecondsBeforeSourceStateMessageEmitted(5L)
473+
.withMeanSecondsBeforeSourceStateMessageEmitted(4L)
470474
.withRecordsCommitted(12L)) // since success, should use emitted count
471475
.withStreamStats(Collections.singletonList(
472476
new StreamSyncStats()
@@ -476,7 +480,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
476480
.withRecordsEmitted(12L)
477481
.withRecordsCommitted(12L) // since success, should use emitted count
478482
.withSourceStateMessagesEmitted(null)
479-
.withDestinationStateMessagesEmitted(null)))))
483+
.withDestinationStateMessagesEmitted(null)
484+
.withMaxSecondsBeforeSourceStateMessageEmitted(null)
485+
.withMeanSecondsBeforeSourceStateMessageEmitted(null)))))
480486
.withOutputCatalog(syncInput.getCatalog())
481487
.withState(new State().withState(expectedState));
482488

@@ -548,6 +554,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
548554
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
549555
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
550556
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
557+
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(10L);
558+
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(8L);
551559

552560
final ReplicationWorker worker = new DefaultReplicationWorker(
553561
JOB_ID,
@@ -565,6 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
565573
.withBytesEmitted(100L)
566574
.withSourceStateMessagesEmitted(3L)
567575
.withDestinationStateMessagesEmitted(2L)
576+
.withMaxSecondsBeforeSourceStateMessageEmitted(10L)
577+
.withMeanSecondsBeforeSourceStateMessageEmitted(8L)
568578
.withRecordsCommitted(6L);
569579
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
570580
new StreamSyncStats()

airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,4 +324,13 @@ void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception {
324324
assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), null);
325325
}
326326

327+
@Test
328+
void testCalculateMean() throws Exception {
329+
// Mean for 3 state messages is 5, 4th state message is 9, new mean should be 6
330+
assertEquals(6L, messageTracker.calculateMean(5L, 4L, 9L));
331+
332+
// Mean for 5 state messages is 10, 4th state message is 12, new mean is 10.33 rounded down to 10
333+
assertEquals(10L, messageTracker.calculateMean(10L, 6L, 12L));
334+
}
335+
327336
}

0 commit comments

Comments
 (0)