Skip to content

Commit 4b92f75

Browse files
authored
[source-postgres] bugfix: Exclude views in incremental iterator (#38130)
1 parent 0f8ffe3 commit 4b92f75

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.4.0
12+
dockerImageTag: 3.4.1
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -190,16 +190,23 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
190190
streamsUnderVacuum.addAll(streamsUnderVacuum(database,
191191
ctidStreams.streamsForCtidSync(), quoteString).result());
192192

193-
final List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
193+
List<ConfiguredAirbyteStream> finalListOfStreamsToBeSyncedViaCtid =
194194
streamsUnderVacuum.isEmpty() ? ctidStreams.streamsForCtidSync()
195195
: ctidStreams.streamsForCtidSync().stream()
196196
.filter(c -> !streamsUnderVacuum.contains(AirbyteStreamNameNamespacePair.fromConfiguredAirbyteSteam(c)))
197197
.toList();
198-
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
199198
final FileNodeHandler fileNodeHandler = PostgresQueryUtils.fileNodeForStreams(database,
200199
finalListOfStreamsToBeSyncedViaCtid,
201200
quoteString);
202201
final PostgresCtidHandler ctidHandler;
202+
if (!fileNodeHandler.getFailedToQuery().isEmpty()) {
203+
finalListOfStreamsToBeSyncedViaCtid = finalListOfStreamsToBeSyncedViaCtid.stream()
204+
.filter(stream -> !fileNodeHandler.getFailedToQuery().contains(
205+
new AirbyteStreamNameNamespacePair(stream.getStream().getName(), stream.getStream().getNamespace())))
206+
.collect(Collectors.toList());
207+
}
208+
LOGGER.info("Streams to be synced via ctid : {}", finalListOfStreamsToBeSyncedViaCtid.size());
209+
203210
try {
204211
ctidHandler =
205212
createInitialLoader(database, finalListOfStreamsToBeSyncedViaCtid, fileNodeHandler, quoteString, ctidStateManager,

docs/integrations/sources/postgres.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
308308

309309
| Version | Date | Pull Request | Subject |
310310
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
311+
| 3.4.1 | 2024-05-10 | [38130](https://github.com/airbytehq/airbyte/pull/38130) | Bug fix on old PG where ctid column not found when stream is a view. |
311312
| 3.4.0 | 2024-04-29 | [37112](https://github.com/airbytehq/airbyte/pull/37112) | resumeable full refresh. |
312313
| 3.3.33 | 2024-05-07 | [38030](https://github.com/airbytehq/airbyte/pull/38030) | Mark PG hot standby error as transient. |
313314
| 3.3.32 | 2024-04-30 | [37758](https://github.com/airbytehq/airbyte/pull/37758) | Correct previous release to disable debezium retries |
@@ -471,13 +472,13 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
471472
| 0.4.43 | 2022-08-03 | [15226](https://github.com/airbytehq/airbyte/pull/15226) | Make connectionTimeoutMs configurable through JDBC url parameters |
472473
| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time |
473474
| 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC |
474-
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead |
475+
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently (⛔ this version has a bug; use `1.0.1` instead |
475476
| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps |
476477
| | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers |
477478
| 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
478479
| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |
479480
| 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
480-
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
481+
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
481482
| 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
482483
| 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
483484
| 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |

0 commit comments

Comments
 (0)