Skip to content

Commit 2782b7a

Browse files
authored
Add AirbyteStreamNamespaceNamePair to MessageTracker Interface (#19361)
Follow up to #19360. This PR adjusts the MessageTracker interface to use the new Pair object.
1 parent 90350c1 commit 2782b7a

File tree

6 files changed

+51
-40
lines changed

6 files changed

+51
-40
lines changed

airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,8 @@ private List<StreamSyncStats> getPerStreamStats(ReplicationStatus outputStatus)
455455
syncStats.setRecordsCommitted(null);
456456
}
457457
return new StreamSyncStats()
458-
.withStreamName(stream)
458+
.withStreamName(stream.getName())
459+
.withStreamNamespace(stream.getNamespace())
459460
.withStats(syncStats);
460461
}).collect(Collectors.toList());
461462
}

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.airbyte.protocol.models.AirbyteRecordMessage;
2424
import io.airbyte.protocol.models.AirbyteStateMessage;
2525
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
26+
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
2627
import io.airbyte.protocol.models.AirbyteTraceMessage;
2728
import io.airbyte.workers.helper.FailureHelper;
2829
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
@@ -33,6 +34,7 @@
3334
import java.util.HashMap;
3435
import java.util.List;
3536
import java.util.Map;
37+
import java.util.Map.Entry;
3638
import java.util.Optional;
3739
import java.util.concurrent.atomic.AtomicReference;
3840
import java.util.stream.Collectors;
@@ -48,7 +50,7 @@ public class AirbyteMessageTracker implements MessageTracker {
4850
private final AtomicReference<State> destinationOutputState;
4951
private final Map<Short, Long> streamToRunningCount;
5052
private final HashFunction hashFunction;
51-
private final BiMap<String, Short> streamNameToIndex;
53+
private final BiMap<AirbyteStreamNameNamespacePair, Short> nameNamespacePairToIndex;
5254
private final Map<Short, Long> streamToTotalBytesEmitted;
5355
private final Map<Short, Long> streamToTotalRecordsEmitted;
5456
private final StateDeltaTracker stateDeltaTracker;
@@ -89,7 +91,7 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
8991
this.sourceOutputState = new AtomicReference<>();
9092
this.destinationOutputState = new AtomicReference<>();
9193
this.streamToRunningCount = new HashMap<>();
92-
this.streamNameToIndex = HashBiMap.create();
94+
this.nameNamespacePairToIndex = HashBiMap.create();
9395
this.hashFunction = Hashing.murmur3_32_fixed();
9496
this.streamToTotalBytesEmitted = new HashMap<>();
9597
this.streamToTotalRecordsEmitted = new HashMap<>();
@@ -139,7 +141,7 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
139141
stateMetricsTracker.setFirstRecordReceivedAt(LocalDateTime.now());
140142
}
141143

142-
final short streamIndex = getStreamIndex(recordMessage.getStream());
144+
final short streamIndex = getStreamIndex(AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage));
143145

144146
final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L);
145147
streamToRunningCount.put(streamIndex, currentRunningCount + 1);
@@ -269,12 +271,12 @@ private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceM
269271

270272
}
271273

272-
private short getStreamIndex(final String streamName) {
273-
if (!streamNameToIndex.containsKey(streamName)) {
274-
streamNameToIndex.put(streamName, nextStreamIndex);
274+
private short getStreamIndex(final AirbyteStreamNameNamespacePair pair) {
275+
if (!nameNamespacePairToIndex.containsKey(pair)) {
276+
nameNamespacePairToIndex.put(pair, nextStreamIndex);
275277
nextStreamIndex++;
276278
}
277-
return streamNameToIndex.get(streamName);
279+
return nameNamespacePairToIndex.get(pair);
278280
}
279281

280282
private int getStateHashCode(final AirbyteStateMessage stateMessage) {
@@ -347,36 +349,32 @@ public Optional<State> getDestinationOutputState() {
347349
* because committed record counts cannot be reliably computed.
348350
*/
349351
@Override
350-
public Optional<Map<String, Long>> getStreamToCommittedRecords() {
352+
public Optional<Map<AirbyteStreamNameNamespacePair, Long>> getStreamToCommittedRecords() {
351353
if (unreliableCommittedCounts) {
352354
return Optional.empty();
353355
}
354356
final Map<Short, Long> streamIndexToCommittedRecordCount = stateDeltaTracker.getStreamToCommittedRecords();
355357
return Optional.of(
356358
streamIndexToCommittedRecordCount.entrySet().stream().collect(
357-
Collectors.toMap(
358-
entry -> streamNameToIndex.inverse().get(entry.getKey()),
359-
Map.Entry::getValue)));
359+
Collectors.toMap(entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)));
360360
}
361361

362362
/**
363363
* Swap out stream indices for stream names and return total records emitted by stream.
364364
*/
365365
@Override
366-
public Map<String, Long> getStreamToEmittedRecords() {
366+
public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedRecords() {
367367
return streamToTotalRecordsEmitted.entrySet().stream().collect(Collectors.toMap(
368-
entry -> streamNameToIndex.inverse().get(entry.getKey()),
369-
Map.Entry::getValue));
368+
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
370369
}
371370

372371
/**
373372
* Swap out stream indices for stream names and return total bytes emitted by stream.
374373
*/
375374
@Override
376-
public Map<String, Long> getStreamToEmittedBytes() {
375+
public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedBytes() {
377376
return streamToTotalBytesEmitted.entrySet().stream().collect(Collectors.toMap(
378-
entry -> streamNameToIndex.inverse().get(entry.getKey()),
379-
Map.Entry::getValue));
377+
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
380378
}
381379

382380
/**

airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.airbyte.config.FailureReason;
88
import io.airbyte.config.State;
99
import io.airbyte.protocol.models.AirbyteMessage;
10+
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
1011
import io.airbyte.protocol.models.AirbyteTraceMessage;
1112
import java.util.Map;
1213
import java.util.Optional;
@@ -55,23 +56,23 @@ public interface MessageTracker {
5556
* @return returns a map of committed record count by stream name. If committed record counts cannot
5657
* be computed, empty.
5758
*/
58-
Optional<Map<String, Long>> getStreamToCommittedRecords();
59+
Optional<Map<AirbyteStreamNameNamespacePair, Long>> getStreamToCommittedRecords();
5960

6061
/**
6162
* Get the per-stream emitted record count. This includes messages that were emitted by the source,
6263
* but never committed by the destination.
6364
*
6465
* @return returns a map of emitted record count by stream name.
6566
*/
66-
Map<String, Long> getStreamToEmittedRecords();
67+
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedRecords();
6768

6869
/**
6970
* Get the per-stream emitted byte count. This includes messages that were emitted by the source,
7071
* but never committed by the destination.
7172
*
7273
* @return returns a map of emitted record count by stream name.
7374
*/
74-
Map<String, Long> getStreamToEmittedBytes();
75+
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedBytes();
7576

7677
/**
7778
* Get the overall emitted record count. This includes messages that were emitted by the source, but

airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class DefaultReplicationWorkerTest {
9191
private static final AirbyteTraceMessage ERROR_TRACE_MESSAGE =
9292
AirbyteMessageUtils.createErrorTraceMessage("a connector error occurred", Double.valueOf(123));
9393
private static final String STREAM1 = "stream1";
94+
95+
private static final String NAMESPACE = "namespace";
9496
private static final String INDUCED_EXCEPTION = "induced exception";
9597

9698
private Path jobRoot;
@@ -483,8 +485,9 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
483485
when(messageTracker.getTotalBytesEmitted()).thenReturn(100L);
484486
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L);
485487
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(1L);
486-
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
487-
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
488+
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(new AirbyteStreamNameNamespacePair(STREAM1, NAMESPACE), 100L));
489+
when(messageTracker.getStreamToEmittedRecords())
490+
.thenReturn(Collections.singletonMap(new AirbyteStreamNameNamespacePair(STREAM1, NAMESPACE), 12L));
488491
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(5L);
489492
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(4L);
490493
when(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted()).thenReturn(Optional.of(6L));
@@ -519,6 +522,7 @@ void testPopulatesOutputOnSuccess() throws WorkerException {
519522
.withStreamStats(Collections.singletonList(
520523
new StreamSyncStats()
521524
.withStreamName(STREAM1)
525+
.withStreamNamespace(NAMESPACE)
522526
.withStats(new SyncStats()
523527
.withBytesEmitted(100L)
524528
.withRecordsEmitted(12L)
@@ -599,9 +603,11 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
599603
when(messageTracker.getTotalRecordsCommitted()).thenReturn(Optional.of(6L));
600604
when(messageTracker.getTotalSourceStateMessagesEmitted()).thenReturn(3L);
601605
when(messageTracker.getTotalDestinationStateMessagesEmitted()).thenReturn(2L);
602-
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(STREAM1, 100L));
603-
when(messageTracker.getStreamToEmittedRecords()).thenReturn(Collections.singletonMap(STREAM1, 12L));
604-
when(messageTracker.getStreamToCommittedRecords()).thenReturn(Optional.of(Collections.singletonMap(STREAM1, 6L)));
606+
when(messageTracker.getStreamToEmittedBytes()).thenReturn(Collections.singletonMap(new AirbyteStreamNameNamespacePair(STREAM1, NAMESPACE), 100L));
607+
when(messageTracker.getStreamToEmittedRecords())
608+
.thenReturn(Collections.singletonMap(new AirbyteStreamNameNamespacePair(STREAM1, NAMESPACE), 12L));
609+
when(messageTracker.getStreamToCommittedRecords())
610+
.thenReturn(Optional.of(Collections.singletonMap(new AirbyteStreamNameNamespacePair(STREAM1, NAMESPACE), 6L)));
605611
when(messageTracker.getMaxSecondsToReceiveSourceStateMessage()).thenReturn(10L);
606612
when(messageTracker.getMeanSecondsToReceiveSourceStateMessage()).thenReturn(8L);
607613
when(messageTracker.getMaxSecondsBetweenStateMessageEmittedAndCommitted()).thenReturn(Optional.of(12L));
@@ -631,6 +637,7 @@ void testPopulatesStatsOnFailureIfAvailable() throws Exception {
631637
final List<StreamSyncStats> expectedStreamStats = Collections.singletonList(
632638
new StreamSyncStats()
633639
.withStreamName(STREAM1)
640+
.withStreamNamespace(NAMESPACE)
634641
.withStats(new SyncStats()
635642
.withBytesEmitted(100L)
636643
.withRecordsEmitted(12L)

airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.airbyte.config.FailureReason;
1212
import io.airbyte.config.State;
1313
import io.airbyte.protocol.models.AirbyteMessage;
14+
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
1415
import io.airbyte.workers.helper.FailureHelper;
1516
import io.airbyte.workers.internal.StateDeltaTracker.StateDeltaTrackerException;
1617
import io.airbyte.workers.internal.state_aggregator.StateAggregator;
@@ -107,10 +108,10 @@ void testEmittedRecordsByStream() {
107108
messageTracker.acceptFromSource(r3);
108109
messageTracker.acceptFromSource(r3);
109110

110-
final Map<String, Long> expected = new HashMap<>();
111-
expected.put(STREAM_1, 1L);
112-
expected.put(STREAM_2, 2L);
113-
expected.put(STREAM_3, 3L);
111+
final HashMap<AirbyteStreamNameNamespacePair, Long> expected = new HashMap<>();
112+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r1.getRecord()), 1L);
113+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r2.getRecord()), 2L);
114+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r3.getRecord()), 3L);
114115

115116
assertEquals(expected, messageTracker.getStreamToEmittedRecords());
116117
}
@@ -132,10 +133,10 @@ void testEmittedBytesByStream() {
132133
messageTracker.acceptFromSource(r3);
133134
messageTracker.acceptFromSource(r3);
134135

135-
final Map<String, Long> expected = new HashMap<>();
136-
expected.put(STREAM_1, r1Bytes);
137-
expected.put(STREAM_2, r2Bytes * 2);
138-
expected.put(STREAM_3, r3Bytes * 3);
136+
final Map<AirbyteStreamNameNamespacePair, Long> expected = new HashMap<>();
137+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r1.getRecord()), r1Bytes);
138+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r2.getRecord()), r2Bytes * 2);
139+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r3.getRecord()), r3Bytes * 3);
139140

140141
assertEquals(expected, messageTracker.getStreamToEmittedBytes());
141142
}
@@ -160,14 +161,14 @@ void testGetCommittedRecordsByStream() {
160161
messageTracker.acceptFromSource(s2); // emit state 2
161162

162163
final Map<Short, Long> countsByIndex = new HashMap<>();
163-
final Map<String, Long> expected = new HashMap<>();
164+
final Map<AirbyteStreamNameNamespacePair, Long> expected = new HashMap<>();
164165
Mockito.when(mStateDeltaTracker.getStreamToCommittedRecords()).thenReturn(countsByIndex);
165166

166167
countsByIndex.put((short) 0, 1L);
167168
countsByIndex.put((short) 1, 2L);
168169
// result only contains counts up to state 1
169-
expected.put(STREAM_1, 1L);
170-
expected.put(STREAM_2, 2L);
170+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r1.getRecord()), 1L);
171+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r2.getRecord()), 2L);
171172
assertEquals(expected, messageTracker.getStreamToCommittedRecords().get());
172173

173174
countsByIndex.clear();
@@ -177,9 +178,9 @@ void testGetCommittedRecordsByStream() {
177178
countsByIndex.put((short) 1, 3L);
178179
countsByIndex.put((short) 2, 1L);
179180
// result updated with counts between state 1 and state 2
180-
expected.put(STREAM_1, 3L);
181-
expected.put(STREAM_2, 3L);
182-
expected.put(STREAM_3, 1L);
181+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r1.getRecord()), 3L);
182+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r2.getRecord()), 3L);
183+
expected.put(AirbyteStreamNameNamespacePair.fromRecordMessage(r3.getRecord()), 1L);
183184
assertEquals(expected, messageTracker.getStreamToCommittedRecords().get());
184185
}
185186

airbyte-config/config-models/src/main/resources/types/StreamSyncStats.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,8 @@ additionalProperties: false
1111
properties:
1212
streamName:
1313
type: string
14+
# Not required as not all sources emits a namespace for each Stream.
15+
streamNamespace:
16+
type: string
1417
stats:
1518
"$ref": SyncStats.yaml

0 commit comments

Comments
 (0)