Skip to content

Commit a3ca3ab

Browse files
VitaliiMaltsevgrishickoctavia-squidington-iii
authored
🐛 Postgres Source: fixed unsupported date-time datatypes during incremental sync (#13655)
* Postgres Source: fixed unsupposted date-time datatypes during incremental sync * updated CHANGELOG * add tests for incremental cursor check * removed star import * Postgres Source: fixed unsupposted date-time datatypes during incremental sync * updated CHANGELOG * add tests for incremental cursor check * removed star import * add timestamp datatype test * Bump version in Dockerfile * auto-bump connector version Co-authored-by: grishick <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent e7f8128 commit a3ca3ab

File tree

8 files changed

+385
-73
lines changed

8 files changed

+385
-73
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -715,7 +715,7 @@
715715
- name: Postgres
716716
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
717717
dockerRepository: airbyte/source-postgres
718-
dockerImageTag: 0.4.21
718+
dockerImageTag: 0.4.22
719719
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
720720
icon: postgresql.svg
721721
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6719,7 +6719,7 @@
67196719
supportsNormalization: false
67206720
supportsDBT: false
67216721
supported_destination_sync_modes: []
6722-
- dockerImage: "airbyte/source-postgres:0.4.21"
6722+
- dockerImage: "airbyte/source-postgres:0.4.22"
67236723
spec:
67246724
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
67256725
connectionSpecification:

airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java

Lines changed: 76 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,13 @@ void testReadOneColumn() throws Exception {
388388

389389
setEmittedAtToNull(actualMessages);
390390

391+
final List<AirbyteMessage> expectedMessages = getAirbyteMessagesReadOneColumn();
392+
assertTrue(expectedMessages.size() == actualMessages.size());
393+
assertTrue(expectedMessages.containsAll(actualMessages));
394+
assertTrue(actualMessages.containsAll(expectedMessages));
395+
}
396+
397+
protected List<AirbyteMessage> getAirbyteMessagesReadOneColumn() {
391398
final List<AirbyteMessage> expectedMessages = getTestMessages().stream()
392399
.map(Jsons::clone)
393400
.peek(m -> {
@@ -397,9 +404,7 @@ void testReadOneColumn() throws Exception {
397404
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
398405
})
399406
.collect(Collectors.toList());
400-
assertTrue(expectedMessages.size() == actualMessages.size());
401-
assertTrue(expectedMessages.containsAll(actualMessages));
402-
assertTrue(actualMessages.containsAll(expectedMessages));
407+
return expectedMessages;
403408
}
404409

405410
@Test
@@ -432,17 +437,7 @@ void testReadMultipleTables() throws Exception {
432437
Field.of(COL_ID, JsonSchemaType.NUMBER),
433438
Field.of(COL_NAME, JsonSchemaType.STRING)));
434439

435-
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
436-
.stream()
437-
.map(Jsons::clone)
438-
.peek(m -> {
439-
m.getRecord().setStream(streamName2);
440-
m.getRecord().setNamespace(getDefaultNamespace());
441-
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
442-
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
443-
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
444-
})
445-
.collect(Collectors.toList());
440+
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondSync(streamName2);
446441
expectedMessages.addAll(secondStreamExpectedMessages);
447442
}
448443

@@ -456,6 +451,21 @@ void testReadMultipleTables() throws Exception {
456451
assertTrue(actualMessages.containsAll(expectedMessages));
457452
}
458453

454+
protected List<AirbyteMessage> getAirbyteMessagesSecondSync(String streamName2) {
455+
return getTestMessages()
456+
.stream()
457+
.map(Jsons::clone)
458+
.peek(m -> {
459+
m.getRecord().setStream(streamName2);
460+
m.getRecord().setNamespace(getDefaultNamespace());
461+
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
462+
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
463+
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
464+
})
465+
.collect(Collectors.toList());
466+
467+
}
468+
459469
@Test
460470
void testTablesWithQuoting() throws Exception {
461471
final ConfiguredAirbyteStream streamForTableWithSpaces = createTableWithSpaces();
@@ -469,7 +479,17 @@ void testTablesWithQuoting() throws Exception {
469479

470480
setEmittedAtToNull(actualMessages);
471481

472-
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
482+
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesForTablesWithQuoting(streamForTableWithSpaces);
483+
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
484+
expectedMessages.addAll(secondStreamExpectedMessages);
485+
486+
assertTrue(expectedMessages.size() == actualMessages.size());
487+
assertTrue(expectedMessages.containsAll(actualMessages));
488+
assertTrue(actualMessages.containsAll(expectedMessages));
489+
}
490+
491+
protected List<AirbyteMessage> getAirbyteMessagesForTablesWithQuoting(ConfiguredAirbyteStream streamForTableWithSpaces) {
492+
return getTestMessages()
473493
.stream()
474494
.map(Jsons::clone)
475495
.peek(m -> {
@@ -481,12 +501,6 @@ void testTablesWithQuoting() throws Exception {
481501
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
482502
})
483503
.collect(Collectors.toList());
484-
final List<AirbyteMessage> expectedMessages = new ArrayList<>(getTestMessages());
485-
expectedMessages.addAll(secondStreamExpectedMessages);
486-
487-
assertTrue(expectedMessages.size() == actualMessages.size());
488-
assertTrue(expectedMessages.containsAll(actualMessages));
489-
assertTrue(actualMessages.containsAll(expectedMessages));
490504
}
491505

492506
@SuppressWarnings("ResultOfMethodCallIgnored")
@@ -532,6 +546,17 @@ void testIncrementalStringCheckCursor() throws Exception {
532546
void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
533547
final ConfiguredAirbyteStream streamWithSpaces = createTableWithSpaces();
534548

549+
final ArrayList<AirbyteMessage> expectedRecordMessages = getAirbyteMessagesCheckCursorSpaceInColumnName(streamWithSpaces);
550+
incrementalCursorCheck(
551+
COL_LAST_NAME_WITH_SPACE,
552+
COL_LAST_NAME_WITH_SPACE,
553+
"patent",
554+
"vash",
555+
expectedRecordMessages,
556+
streamWithSpaces);
557+
}
558+
559+
protected ArrayList<AirbyteMessage> getAirbyteMessagesCheckCursorSpaceInColumnName(ConfiguredAirbyteStream streamWithSpaces) {
535560
final AirbyteMessage firstMessage = getTestMessages().get(0);
536561
firstMessage.getRecord().setStream(streamWithSpaces.getStream().getName());
537562
((ObjectNode) firstMessage.getRecord().getData()).remove(COL_UPDATED_AT);
@@ -546,21 +571,15 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {
546571

547572
Lists.newArrayList(getTestMessages().get(0), getTestMessages().get(2));
548573

549-
incrementalCursorCheck(
550-
COL_LAST_NAME_WITH_SPACE,
551-
COL_LAST_NAME_WITH_SPACE,
552-
"patent",
553-
"vash",
554-
Lists.newArrayList(firstMessage, secondMessage),
555-
streamWithSpaces);
574+
return Lists.newArrayList(firstMessage, secondMessage);
556575
}
557576

558577
@Test
559-
void testIncrementalTimestampCheckCursor() throws Exception {
560-
incrementalTimestampCheck();
578+
void testIncrementalDateCheckCursor() throws Exception {
579+
incrementalDateCheck();
561580
}
562581

563-
protected void incrementalTimestampCheck() throws Exception {
582+
protected void incrementalDateCheck() throws Exception {
564583
incrementalCursorCheck(
565584
COL_UPDATED_AT,
566585
"2005-10-18T00:00:00Z",
@@ -600,14 +619,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
600619
.filter(r -> r.getType() == Type.STATE).findFirst();
601620
assertTrue(stateAfterFirstSyncOptional.isPresent());
602621

603-
database.execute(connection -> {
604-
connection.createStatement().execute(
605-
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
606-
getFullyQualifiedTableName(TABLE_NAME)));
607-
connection.createStatement().execute(
608-
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
609-
getFullyQualifiedTableName(TABLE_NAME)));
610-
});
622+
executeStatementReadIncrementallyTwice();
611623

612624
final List<AirbyteMessage> actualMessagesSecondSync = MoreIterators
613625
.toList(source.read(config, configuredCatalog,
@@ -624,6 +636,17 @@ void testReadOneTableIncrementallyTwice() throws Exception {
624636
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
625637
}
626638

639+
protected void executeStatementReadIncrementallyTwice() throws SQLException {
640+
database.execute(connection -> {
641+
connection.createStatement().execute(
642+
String.format("INSERT INTO %s(id, name, updated_at) VALUES (4,'riker', '2006-10-19')",
643+
getFullyQualifiedTableName(TABLE_NAME)));
644+
connection.createStatement().execute(
645+
String.format("INSERT INTO %s(id, name, updated_at) VALUES (5, 'data', '2006-10-19')",
646+
getFullyQualifiedTableName(TABLE_NAME)));
647+
});
648+
}
649+
627650
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
628651
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
629652
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
@@ -696,16 +719,7 @@ void testReadMultipleTablesIncrementally() throws Exception {
696719

697720
// we know the second streams messages are the same as the first minus the updated at column. so we
698721
// cheat and generate the expected messages off of the first expected messages.
699-
final List<AirbyteMessage> secondStreamExpectedMessages = getTestMessages()
700-
.stream()
701-
.map(Jsons::clone)
702-
.peek(m -> {
703-
m.getRecord().setStream(streamName2);
704-
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
705-
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
706-
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
707-
})
708-
.collect(Collectors.toList());
722+
final List<AirbyteMessage> secondStreamExpectedMessages = getAirbyteMessagesSecondStreamWithNamespace(streamName2);
709723
final List<AirbyteMessage> expectedMessagesFirstSync = new ArrayList<>(getTestMessages());
710724
expectedMessagesFirstSync.add(new AirbyteMessage()
711725
.withType(Type.STATE)
@@ -748,6 +762,19 @@ void testReadMultipleTablesIncrementally() throws Exception {
748762
assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync));
749763
}
750764

765+
protected List<AirbyteMessage> getAirbyteMessagesSecondStreamWithNamespace(String streamName2) {
766+
return getTestMessages()
767+
.stream()
768+
.map(Jsons::clone)
769+
.peek(m -> {
770+
m.getRecord().setStream(streamName2);
771+
((ObjectNode) m.getRecord().getData()).remove(COL_UPDATED_AT);
772+
((ObjectNode) m.getRecord().getData()).replace(COL_ID,
773+
convertIdBasedOnDatabase(m.getRecord().getData().get(COL_ID).asInt()));
774+
})
775+
.collect(Collectors.toList());
776+
}
777+
751778
// when initial and final cursor fields are the same.
752779
protected void incrementalCursorCheck(
753780
final String cursorField,

airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ protected AirbyteCatalog getCatalog(final String defaultNamespace) {
164164
}
165165

166166
@Override
167-
protected void incrementalTimestampCheck() throws Exception {
167+
protected void incrementalDateCheck() throws Exception {
168168
super.incrementalCursorCheck(COL_UPDATED_AT,
169169
"2005-10-18",
170170
"2006-10-19",

airbyte-integrations/connectors/source-postgres/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.21
19+
LABEL io.airbyte.version=0.4.22
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import io.airbyte.db.jdbc.JdbcSourceOperations;
2222
import io.airbyte.protocol.models.JsonSchemaType;
2323
import java.math.BigDecimal;
24-
import java.sql.Date;
2524
import java.sql.JDBCType;
2625
import java.sql.PreparedStatement;
2726
import java.sql.ResultSet;
@@ -30,6 +29,8 @@
3029
import java.time.LocalDate;
3130
import java.time.LocalDateTime;
3231
import java.time.LocalTime;
32+
import java.time.OffsetDateTime;
33+
import java.time.OffsetTime;
3334
import java.util.Collections;
3435
import org.postgresql.jdbc.PgResultSetMetaData;
3536
import org.slf4j.Logger;
@@ -79,15 +80,57 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
7980
}
8081

8182
@Override
82-
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
83-
try {
84-
Date date = Date.valueOf(value);
85-
preparedStatement.setDate(parameterIndex, date);
86-
} catch (final Exception e) {
87-
throw new RuntimeException(e);
83+
public void setStatementField(final PreparedStatement preparedStatement,
84+
final int parameterIndex,
85+
final JDBCType cursorFieldType,
86+
final String value)
87+
throws SQLException {
88+
switch (cursorFieldType) {
89+
90+
case TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
91+
case TIMESTAMP_WITH_TIMEZONE -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
92+
case TIME -> setTime(preparedStatement, parameterIndex, value);
93+
case TIME_WITH_TIMEZONE -> setTimeWithTimezone(preparedStatement, parameterIndex, value);
94+
case DATE -> setDate(preparedStatement, parameterIndex, value);
95+
case BIT -> setBit(preparedStatement, parameterIndex, value);
96+
case BOOLEAN -> setBoolean(preparedStatement, parameterIndex, value);
97+
case TINYINT, SMALLINT -> setShortInt(preparedStatement, parameterIndex, value);
98+
case INTEGER -> setInteger(preparedStatement, parameterIndex, value);
99+
case BIGINT -> setBigInteger(preparedStatement, parameterIndex, value);
100+
case FLOAT, DOUBLE -> setDouble(preparedStatement, parameterIndex, value);
101+
case REAL -> setReal(preparedStatement, parameterIndex, value);
102+
case NUMERIC, DECIMAL -> setDecimal(preparedStatement, parameterIndex, value);
103+
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> setString(preparedStatement, parameterIndex, value);
104+
case BINARY, BLOB -> setBinary(preparedStatement, parameterIndex, value);
105+
// since cursor are expected to be comparable, handle cursor typing strictly and error on
106+
// unrecognized types
107+
default -> throw new IllegalArgumentException(String.format("%s is not supported.", cursorFieldType));
88108
}
89109
}
90110

111+
private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
112+
preparedStatement.setObject(parameterIndex, OffsetTime.parse(value));
113+
}
114+
115+
private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
116+
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
117+
}
118+
119+
@Override
120+
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
121+
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
122+
}
123+
124+
@Override
125+
protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
126+
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
127+
}
128+
129+
@Override
130+
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
131+
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
132+
}
133+
91134
@Override
92135
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
93136
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();

0 commit comments

Comments
 (0)