Skip to content

Commit 6b59236

Browse files
authored
Track state message counts during syncs (#15526)
* Track state message counts from source and destination connectors during syncs - Add new field to SyncStats and update existing field
1 parent 6332fd6 commit 6b59236

File tree

11 files changed

+57
-25
lines changed

11 files changed

+57
-25
lines changed

airbyte-config/config-models/src/main/resources/types/SyncStats.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ type: object
77
required:
88
- recordsEmitted
99
- bytesEmitted
10-
additionalProperties: false
10+
additionalProperties: true
1111
properties:
1212
recordsEmitted:
1313
type: integer
1414
bytesEmitted:
1515
type: integer
16-
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
16+
sourceStateMessagesEmitted:
17+
description: Number of State messages emitted by the Source Connector
18+
type: integer
19+
destinationStateMessagesEmitted:
20+
description: Number of State messages emitted by the Destination Connector
1721
type: integer
1822
recordsCommitted:
1923
type: integer # if unset, committed records could not be computed

airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public static ImmutableMap<String, Object> generateJobAttemptMetadata(final Job
109109
metadata.put("duration", Math.round((syncSummary.getEndTime() - syncSummary.getStartTime()) / 1000.0));
110110
metadata.put("volume_mb", syncSummary.getBytesSynced());
111111
metadata.put("volume_rows", syncSummary.getRecordsSynced());
112+
metadata.put("count_state_messages_from_source", syncSummary.getTotalStats().getSourceStateMessagesEmitted());
113+
metadata.put("count_state_messages_from_destination", syncSummary.getTotalStats().getDestinationStateMessagesEmitted());
112114
}
113115
}
114116

airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import io.airbyte.config.StandardSyncOutput;
3838
import io.airbyte.config.StandardSyncSummary;
3939
import io.airbyte.config.StandardWorkspace;
40+
import io.airbyte.config.SyncStats;
4041
import io.airbyte.config.persistence.ConfigNotFoundException;
4142
import io.airbyte.config.persistence.ConfigRepository;
4243
import io.airbyte.protocol.models.CatalogHelpers;
@@ -111,6 +112,8 @@ class JobTrackerTest {
111112
.put("duration", SYNC_DURATION)
112113
.put("volume_rows", SYNC_RECORDS_SYNC)
113114
.put("volume_mb", SYNC_BYTES_SYNC)
115+
.put("count_state_messages_from_source", 3L)
116+
.put("count_state_messages_from_destination", 1L)
114117
.build();
115118
private static final ImmutableMap<String, Object> SYNC_CONFIG_METADATA = ImmutableMap.<String, Object>builder()
116119
.put(JobTracker.CONFIG + ".source.key", JobTracker.SET)
@@ -481,14 +484,18 @@ private Attempt getAttemptMock() {
481484
final JobOutput jobOutput = mock(JobOutput.class);
482485
final StandardSyncOutput syncOutput = mock(StandardSyncOutput.class);
483486
final StandardSyncSummary syncSummary = mock(StandardSyncSummary.class);
487+
final SyncStats syncStats = mock(SyncStats.class);
484488

485489
when(syncSummary.getStartTime()).thenReturn(SYNC_START_TIME);
486490
when(syncSummary.getEndTime()).thenReturn(SYNC_END_TIME);
487491
when(syncSummary.getBytesSynced()).thenReturn(SYNC_BYTES_SYNC);
488492
when(syncSummary.getRecordsSynced()).thenReturn(SYNC_RECORDS_SYNC);
489493
when(syncOutput.getStandardSyncSummary()).thenReturn(syncSummary);
494+
when(syncSummary.getTotalStats()).thenReturn(syncStats);
490495
when(jobOutput.getSync()).thenReturn(syncOutput);
491496
when(attempt.getOutput()).thenReturn(java.util.Optional.of(jobOutput));
497+
when(syncStats.getSourceStateMessagesEmitted()).thenReturn(3L);
498+
when(syncStats.getDestinationStateMessagesEmitted()).thenReturn(1L);
492499
return attempt;
493500
}
494501

airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ private static AttemptStats getTotalAttemptStats(final Attempt attempt) {
158158
return new AttemptStats()
159159
.bytesEmitted(totalStats.getBytesEmitted())
160160
.recordsEmitted(totalStats.getRecordsEmitted())
161-
.stateMessagesEmitted(totalStats.getStateMessagesEmitted())
161+
.stateMessagesEmitted(totalStats.getSourceStateMessagesEmitted())
162162
.recordsCommitted(totalStats.getRecordsCommitted());
163163
}
164164

@@ -175,7 +175,7 @@ private static List<AttemptStreamStats> getAttemptStreamStats(final Attempt atte
175175
.stats(new AttemptStats()
176176
.bytesEmitted(streamStat.getStats().getBytesEmitted())
177177
.recordsEmitted(streamStat.getStats().getRecordsEmitted())
178-
.stateMessagesEmitted(streamStat.getStats().getStateMessagesEmitted())
178+
.stateMessagesEmitted(streamStat.getStats().getSourceStateMessagesEmitted())
179179
.recordsCommitted(streamStat.getStats().getRecordsCommitted())))
180180
.collect(Collectors.toList());
181181
}

airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,14 @@ class JobConverterTest {
9999
.withTotalStats(new SyncStats()
100100
.withRecordsEmitted(RECORDS_EMITTED)
101101
.withBytesEmitted(BYTES_EMITTED)
102-
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
102+
.withSourceStateMessagesEmitted(STATE_MESSAGES_EMITTED)
103103
.withRecordsCommitted(RECORDS_COMMITTED))
104104
.withStreamStats(Lists.newArrayList(new StreamSyncStats()
105105
.withStreamName(STREAM_NAME)
106106
.withStats(new SyncStats()
107107
.withRecordsEmitted(RECORDS_EMITTED)
108108
.withBytesEmitted(BYTES_EMITTED)
109-
.withStateMessagesEmitted(STATE_MESSAGES_EMITTED)
109+
.withSourceStateMessagesEmitted(STATE_MESSAGES_EMITTED)
110110
.withRecordsCommitted(RECORDS_COMMITTED))))));
111111

112112
private JobConverter jobConverter;

airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,8 @@ else if (hasFailed.get()) {
201201
final SyncStats totalSyncStats = new SyncStats()
202202
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
203203
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
204-
.withStateMessagesEmitted(messageTracker.getTotalStateMessagesEmitted());
204+
.withSourceStateMessagesEmitted(messageTracker.getTotalSourceStateMessagesEmitted())
205+
.withDestinationStateMessagesEmitted(messageTracker.getTotalDestinationStateMessagesEmitted());
205206

206207
if (outputStatus == ReplicationStatus.COMPLETED) {
207208
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
@@ -217,7 +218,8 @@ else if (hasFailed.get()) {
217218
final SyncStats syncStats = new SyncStats()
218219
.withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
219220
.withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
220-
.withStateMessagesEmitted(null); // TODO (parker) populate per-stream state messages emitted once supported in V2
221+
.withSourceStateMessagesEmitted(null)
222+
.withDestinationStateMessagesEmitted(null);
221223

222224
if (outputStatus == ReplicationStatus.COMPLETED) {
223225
syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream));

airbyte-workers/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ public class AirbyteMessageTracker implements MessageTracker {
3838

3939
private final AtomicReference<State> sourceOutputState;
4040
private final AtomicReference<State> destinationOutputState;
41-
private final AtomicLong totalEmittedStateMessages;
41+
private final AtomicLong totalSourceEmittedStateMessages;
42+
private final AtomicLong totalDestinationEmittedStateMessages;
4243
private final Map<Short, Long> streamToRunningCount;
4344
private final HashFunction hashFunction;
4445
private final BiMap<String, Short> streamNameToIndex;
@@ -71,7 +72,8 @@ public AirbyteMessageTracker() {
7172
protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, final StateAggregator stateAggregator) {
7273
this.sourceOutputState = new AtomicReference<>();
7374
this.destinationOutputState = new AtomicReference<>();
74-
this.totalEmittedStateMessages = new AtomicLong(0L);
75+
this.totalSourceEmittedStateMessages = new AtomicLong(0L);
76+
this.totalDestinationEmittedStateMessages = new AtomicLong(0L);
7577
this.streamToRunningCount = new HashMap<>();
7678
this.streamNameToIndex = HashBiMap.create();
7779
this.hashFunction = Hashing.murmur3_32_fixed();
@@ -130,7 +132,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
130132
*/
131133
private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
132134
sourceOutputState.set(new State().withState(stateMessage.getData()));
133-
totalEmittedStateMessages.incrementAndGet();
135+
totalSourceEmittedStateMessages.incrementAndGet();
134136
final int stateHash = getStateHashCode(stateMessage);
135137
try {
136138
if (!unreliableCommittedCounts) {
@@ -150,6 +152,7 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
150152
* committed in the {@link StateDeltaTracker}. Also record this state as the last committed state.
151153
*/
152154
private void handleDestinationEmittedState(final AirbyteStateMessage stateMessage) {
155+
totalDestinationEmittedStateMessages.incrementAndGet();
153156
stateAggregator.ingest(stateMessage);
154157
destinationOutputState.set(stateAggregator.getAggregated());
155158
try {
@@ -315,8 +318,13 @@ public Optional<Long> getTotalRecordsCommitted() {
315318
}
316319

317320
@Override
318-
public Long getTotalStateMessagesEmitted() {
319-
return totalEmittedStateMessages.get();
321+
public Long getTotalSourceStateMessagesEmitted() {
322+
return totalSourceEmittedStateMessages.get();
323+
}
324+
325+
@Override
326+
public Long getTotalDestinationStateMessagesEmitted() {
327+
return totalDestinationEmittedStateMessages.get();
320328
}
321329

322330
}

airbyte-workers/src/main/java/io/airbyte/workers/internal/MessageTracker.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ public interface MessageTracker {
9898
Optional<Long> getTotalRecordsCommitted();
9999

100100
/**
101-
* Get the overall emitted state message count.
101+
* Get the count of state messages emitted from the source connector.
102102
*
103-
* @return returns the total count of emitted state messages.
103+
* @return returns the total count of state messages emitted from the source.
104104
*/
105-
Long getTotalStateMessagesEmitted();
105+
Long getTotalSourceStateMessagesEmitted();
106+
107+
Long getTotalDestinationStateMessagesEmitted();
106108

107109
AirbyteTraceMessage getFirstDestinationErrorTraceMessage();
108110

airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/SyncCheckConnectionFailure.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,9 @@ public StandardSyncOutput buildFailureOutput() {
6868
.withTotalStats(new SyncStats()
6969
.withRecordsEmitted(0L)
7070
.withBytesEmitted(0L)
71-
.withStateMessagesEmitted(0L)
72-
.withRecordsCommitted(0L)));;
71+
.withSourceStateMessagesEmitted(0L)
72+
.withDestinationStateMessagesEmitted(0L)
73+
.withRecordsCommitted(0L)));
7374

7475
if (failureOutput.getFailureReason() != null) {
7576
syncOutput.setFailures(List.of(failureOutput.getFailureReason().withFailureOrigin(origin)));

airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
441441
when(messageTracker.getDestinationOutputState()).thenReturn(Optional.of(new State().withState(expectedState)));
442442
when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L);
443443
when(messageTracker.getTotalBytesEmitted()).thenReturn(100L);
444-
when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L);
444+
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L);
445+
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
445446
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
446447
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
447448

@@ -464,7 +465,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
464465
.withTotalStats(new SyncStats()
465466
.withRecordsEmitted(12L)
466467
.withBytesEmitted(100L)
467-
.withStateMessagesEmitted(3L)
468+
.withSourceStateMessagesEmitted(3L)
469+
.withDestinationStateMessagesEmitted(1L)
468470
.withRecordsCommitted(12L)) // since success, should use emitted count
469471
.withStreamStats(Collections.singletonList(
470472
new StreamSyncStats()
@@ -473,7 +475,8 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
473475
.withBytesEmitted(100L)
474476
.withRecordsEmitted(12L)
475477
.withRecordsCommitted(12L) // since success, should use emitted count
476-
.withStateMessagesEmitted(null)))))
478+
.withSourceStateMessagesEmitted(null)
479+
.withDestinationStateMessagesEmitted(null)))))
477480
.withOutputCatalog(syncInput.getCatalog())
478481
.withState(new State().withState(expectedState));
479482

@@ -540,7 +543,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
540543
when(messageTracker.getTotalRecordsEmitted()).thenReturn(12L);
541544
when(messageTracker.getTotalBytesEmitted()).thenReturn(100L);
542545
when(messageTracker.getTotalRecordsCommitted()).thenReturn(Optional.of(6L));
543-
when(messageTracker.getTotalStateMessagesEmitted()).thenReturn(3L);
546+
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L);
547+
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(2L);
544548
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
545549
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
546550
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
@@ -559,7 +563,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
559563
final SyncStats expectedTotalStats = new SyncStats()
560564
.withRecordsEmitted(12L)
561565
.withBytesEmitted(100L)
562-
.withStateMessagesEmitted(3L)
566+
.withSourceStateMessagesEmitted(3L)
567+
.withDestinationStateMessagesEmitted(2L)
563568
.withRecordsCommitted(6L);
564569
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
565570
new StreamSyncStats()
@@ -568,7 +573,8 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
568573
.withBytesEmitted(100L)
569574
.withRecordsEmitted(12L)
570575
.withRecordsCommitted(6L)
571-
.withStateMessagesEmitted(null)));
576+
.withSourceStateMessagesEmitted(null)
577+
.withDestinationStateMessagesEmitted(null)));
572578

573579
assertNotNull(actual);
574580
assertEquals(expectedTotalStats, actual.getReplicationAttemptSummary().getTotalStats());

airbyte-workers/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ void testGetTotalRecordsStatesAndBytesEmitted() {
5858

5959
assertEquals(3, messageTracker.getTotalRecordsEmitted());
6060
assertEquals(3L * Jsons.getEstimatedByteSize(r1.getRecord().getData()), messageTracker.getTotalBytesEmitted());
61-
assertEquals(2, messageTracker.getTotalStateMessagesEmitted());
61+
assertEquals(2, messageTracker.getTotalSourceStateMessagesEmitted());
6262
}
6363

6464
@Test

0 commit comments

Comments
 (0)