Skip to content

Commit 49fc60d

Browse files
authored
source-postgres - Streams not in the CDC publication still have a cursor and PK (#38303)
1 parent 5c492b5 commit 49fc60d

File tree

4 files changed

+88
-86
lines changed

4 files changed

+88
-86
lines changed

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.4
12+
dockerImageTag: 3.4.5
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCatalogHelper.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,15 @@ public static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
9292
/**
9393
* Modifies streams that are NOT present in the publication to be full-refresh only streams. Users
9494
* should be able to replicate these streams, just not in incremental mode as they have no
95-
* associated publication.
95+
* associated publication. Previously, we also setSourceDefinedCursor(false) and
96+
* setSourceDefinedPrimaryKey(List.of()) for streams that are in the catalog but not in the CDC
97+
* publication, but now that full refresh streams can be resumable, we should include this
98+
* information.
9699
*/
97100
public static AirbyteStream setFullRefreshForNonPublicationStreams(final AirbyteStream stream,
98101
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc) {
99102
if (!publicizedTablesInCdc.contains(new AirbyteStreamNameNamespacePair(stream.getName(), stream.getNamespace()))) {
100103
stream.setSupportedSyncModes(List.of(SyncMode.FULL_REFRESH));
101-
stream.setSourceDefinedCursor(false);
102-
stream.setSourceDefinedPrimaryKey(List.of());
103104
}
104105
return stream;
105106
}

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -569,8 +569,8 @@ void testDiscoverFiltersNonPublication() throws Exception {
569569
// The stream that does not have an associated publication should not have support for
570570
// source-defined incremental sync.
571571
assertEquals(streamNotInPublication.getSupportedSyncModes(), List.of(SyncMode.FULL_REFRESH));
572-
assertTrue(streamNotInPublication.getSourceDefinedPrimaryKey().isEmpty());
573-
assertFalse(streamNotInPublication.getSourceDefinedCursor());
572+
assertFalse(streamNotInPublication.getSourceDefinedPrimaryKey().isEmpty());
573+
assertTrue(streamNotInPublication.getSourceDefinedCursor());
574574
testdb.query(ctx -> ctx.execute("DROP PUBLICATION " + testdb.getPublicationName() + ";"));
575575
testdb.query(ctx -> ctx.execute("CREATE PUBLICATION " + testdb.getPublicationName() + " FOR ALL TABLES"));
576576
}

0 commit comments

Comments
 (0)