Skip to content

Commit 825528c

Browse files
rodireichariesgun
authored andcommitted
🐛 Update initial load query for old postgres to return a defined order … (airbytehq#31328)
Co-authored-by: rodireich <[email protected]>
1 parent ad2db9b commit 825528c

File tree

6 files changed

+49
-6
lines changed

6 files changed

+49
-6
lines changed

airbyte-integrations/connectors/source-postgres-strict-encrypt/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ data:
1212
connectorType: source
1313
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
1414
maxSecondsBetweenMessages: 7200
15-
dockerImageTag: 3.1.11
15+
dockerImageTag: 3.1.12
1616
dockerRepository: airbyte/source-postgres-strict-encrypt
1717
githubIssueLabel: source-postgres
1818
icon: postgresql.svg

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ ENV APPLICATION source-postgres
2424

2525
COPY --from=build /airbyte /airbyte
2626

27-
LABEL io.airbyte.version=3.1.11
27+
LABEL io.airbyte.version=3.1.12
2828
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ data:
66
connectorSubtype: database
77
connectorType: source
88
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
9-
dockerImageTag: 3.1.11
9+
dockerImageTag: 3.1.12
1010
maxSecondsBetweenMessages: 7200
1111
dockerRepository: airbyte/source-postgres
1212
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -303,13 +303,12 @@ public PreparedStatement createCtidLegacyQueryStatement(final Connection connect
303303
Preconditions.checkArgument(lowerBound != null, "Lower bound ctid expected");
304304
Preconditions.checkArgument(upperBound != null, "Upper bound ctid expected");
305305
try {
306-
LOGGER.info("*** one more {}", lowerBound);
307306
LOGGER.info("Preparing query for table: {}", tableName);
308307
final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName,
309308
quoteString);
310309
final String wrappedColumnNames = RelationalDbQueryUtils.enquoteIdentifierList(columnNames, quoteString);
311310
final String sql =
312-
"SELECT ctid::text, %s FROM %s WHERE ctid = ANY (ARRAY (SELECT FORMAT('(%%s,%%s)', page, tuple)::tid FROM generate_series(?, ?) as page, generate_series(?,?) as tuple))"
311+
"SELECT ctid::text, %s FROM %s WHERE ctid = ANY (ARRAY (SELECT FORMAT('(%%s,%%s)', page, tuple)::tid tid_addr FROM generate_series(?, ?) as page, generate_series(?,?) as tuple ORDER BY tid_addr))"
313312
.formatted(
314313
wrappedColumnNames, fullTableName);
315314
final PreparedStatement preparedStatement = connection.prepareStatement(sql);

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java

+43
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.junit.jupiter.api.AfterAll;
6363
import org.junit.jupiter.api.BeforeAll;
6464
import org.junit.jupiter.api.BeforeEach;
65+
import org.junit.jupiter.api.DisplayName;
6566
import org.junit.jupiter.api.Test;
6667
import org.junit.jupiter.api.extension.ExtendWith;
6768
import org.testcontainers.containers.PostgreSQLContainer;
@@ -852,4 +853,46 @@ public void testJdbcOptionsParameter() throws Exception {
852853
}
853854
}
854855

856+
@Test
857+
@DisplayName("Make sure initial incremental load is reading records in a certain order")
858+
void testReadIncrementalRecordOrder() throws Exception {
859+
final JsonNode config = getConfig(PSQL_DB, dbName);
860+
// We want to test ordering, so we can delete the NaN entry
861+
try (final DSLContext dslContext = getDslContext(config)) {
862+
final Database database = getDatabase(dslContext);
863+
database.query(ctx -> {
864+
ctx.fetch("DELETE FROM id_and_name WHERE id = 'NaN';");
865+
for (int i = 3; i < 1000; i++) {
866+
ctx.fetch("INSERT INTO id_and_name (id, name, power) VALUES (%d, 'gohan%d', 222.1);".formatted(i, i));
867+
}
868+
return null;
869+
});
870+
871+
final ConfiguredAirbyteCatalog configuredCatalog =
872+
CONFIGURED_INCR_CATALOG
873+
.withStreams(CONFIGURED_INCR_CATALOG.getStreams().stream().filter(s -> s.getStream().getName().equals(STREAM_NAME)).collect(
874+
Collectors.toList()));
875+
final PostgresSource source = new PostgresSource();
876+
source.setStateEmissionFrequencyForDebug(1);
877+
final List<AirbyteMessage> actualMessages = MoreIterators.toList(source.read(getConfig(PSQL_DB, dbName), configuredCatalog, null));
878+
setEmittedAtToNull(actualMessages);
879+
880+
// final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessage(actualMessages);
881+
882+
setEmittedAtToNull(actualMessages);
883+
884+
final Set<AirbyteMessage> expectedOutput = Sets.newHashSet(
885+
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("1.0"), "name", "goku", "power", null)),
886+
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("2.0"), "name", "vegeta", "power", 9000.1)));
887+
for (int i = 3; i < 1000; i++) {
888+
expectedOutput.add(
889+
createRecord(STREAM_NAME, SCHEMA_NAME, map("id", new BigDecimal("%d.0".formatted(i)), "name", "gohan%d".formatted(i), "power", 222.1)));
890+
}
891+
assertThat(actualMessages.contains(expectedOutput));
892+
// Assert that the Postgres source is emitting records & state messages in the correct order.
893+
assertCorrectRecordOrderForIncrementalSync(actualMessages, "id", JsonSchemaPrimitive.NUMBER, configuredCatalog,
894+
new AirbyteStreamNameNamespacePair("id_and_name", "public"));
895+
}
896+
}
897+
855898
}

docs/integrations/sources/postgres.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,8 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
291291

292292
| Version | Date | Pull Request | Subject |
293293
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
294-
| 3.1.11 | 2023-10-11 | [31322](https://github.com/airbytehq/airbyte/pull/31322) | Correct pevious release |
294+
| 3.1.12 | 2023-10-12 | [31328](https://github.com/airbytehq/airbyte/pull/31328) | Improvements to initial load of tables in older versions of postgres. |
295+
| 3.1.11 | 2023-10-11 | [31322](https://github.com/airbytehq/airbyte/pull/31322) | Correct pevious release |
295296
| 3.1.10 | 2023-09-29 | [30806](https://github.com/airbytehq/airbyte/pull/30806) | Cap log line length to 32KB to prevent loss of records. |
296297
| 3.1.9 | 2023-09-25 | [30534](https://github.com/airbytehq/airbyte/pull/30534) | Fix JSONB[] column type handling bug. |
297298
| 3.1.8 | 2023-09-20 | [30125](https://github.com/airbytehq/airbyte/pull/30125) | Improve initial load performance for older versions of PostgreSQL. |

0 commit comments

Comments
 (0)