Skip to content

Commit 59cdc36

Browse files
authored
[source-postgres] Fix final state message on state (#38171)
1 parent 8fdd981 commit 59cdc36

File tree

7 files changed

+100
-83
lines changed

7 files changed

+100
-83
lines changed
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.35.0
1+
version=0.35.1

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+13-2
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,15 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
120120
// Do nothing.
121121
}
122122

123+
private fun assertStateDoNotHaveDuplicateStreams(stateMessage: AirbyteStateMessage) {
124+
val dedupedStreamStates =
125+
stateMessage.global.streamStates
126+
.stream()
127+
.map { streamState: AirbyteStreamState -> streamState.streamDescriptor }
128+
.collect(Collectors.toSet())
129+
Assertions.assertEquals(dedupedStreamStates.size, stateMessage.global.streamStates.size)
130+
}
131+
123132
@BeforeEach
124133
protected open fun setup() {
125134
testdb = createTestDatabase()
@@ -616,6 +625,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
616625

617626
val recordMessages1 = extractRecordMessages(actualRecords1)
618627
val stateMessages1 = extractStateMessages(actualRecords1)
628+
stateMessages1.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
619629
val names = HashSet(STREAM_NAMES)
620630
names.add(MODELS_STREAM_NAME_2)
621631

@@ -657,7 +667,7 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
657667
} else {
658668
assertExpectedStateMessageCountMatches(
659669
stateMessages1,
660-
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
670+
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
661671
)
662672
assertExpectedRecords(
663673
Streams.concat(MODEL_RECORDS_2.stream(), MODEL_RECORDS.stream())
@@ -1236,8 +1246,9 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
12361246

12371247
assertExpectedStateMessageCountMatches(
12381248
stateAfterFirstBatch,
1239-
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
1249+
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong(),
12401250
)
1251+
stateAfterFirstBatch.map { state -> assertStateDoNotHaveDuplicateStreams(state) }
12411252
}
12421253

12431254
protected open fun assertStateMessagesForNewTableSnapshotTest(

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.34.2'
15+
cdkVersionRequired = '0.35.1'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.1
12+
dockerImageTag: 3.4.2
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/CtidGlobalStateManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public AirbyteStateMessage createFinalStateMessage(final AirbyteStreamNameNamesp
145145

146146
resumableFullRefreshStreams.forEach(stream -> {
147147
final CtidStatus ctidStatusForFullRefreshStream = generateCtidStatusForState(pair);
148-
streamStates.add(getAirbyteStreamState(pair, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
148+
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ctidStatusForFullRefreshStream)));
149149
});
150150

151151
return new AirbyteStateMessage()

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+5
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,11 @@ private void assertStateTypes(final List<? extends AirbyteStateMessage> stateMes
246246
}
247247
}
248248

249+
@Override
250+
protected void validateStreamStateInResumableFullRefresh(final JsonNode streamStateToBeTested) {
251+
assertEquals("ctid", streamStateToBeTested.get("state_type").asText());
252+
}
253+
249254
@Override
250255
@Test
251256
protected void testCdcAndNonResumableFullRefreshInSameSync() throws Exception {}

0 commit comments

Comments
 (0)