Skip to content

Commit 708802d

Browse files
yurii-bidiukedgaooctavia-squidington-iii
authored
🎉Source Postgres: 13608, 12026, 14590 - Align regular and CDC integration tests and data mappers; improve BCE date handling (#14534)
* 13608 & 12026 - align regular and CDC integration tests and data mappers * format code * update int handling * fix build * fix PR remarks * revert changes for money type that are broken by #7338 * bump version * 🐛 Source Postgres: Improve BCE date handling (#15187) * 13608 & 12026 - align regular and CDC integration tests and data mappers * format code * update int handling * borked merge - re-delete deleted methods * enable catalog tests for postgres * fix build * fix PR remarks * revert changes for money type that are broken by #7338 * update BCE handling in JDBC * reuse existing method * handle bce dates * inline methods * fix JDBC BCE year inconsistency * use correct data type in test * format * Update airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractPostgresSourceDatatypeTest.java Co-authored-by: Edward Gao <[email protected]> * pmd fix * use class.getname() * fix pmd * format * bump version * handle incremental mode * clean up diff * more comments * unused imports * format * versions+changelog Co-authored-by: Yurii Bidiuk <[email protected]> Co-authored-by: Yurii Bidiuk <[email protected]> * auto-bump connector version [ci skip] Co-authored-by: Edward Gao <[email protected]> Co-authored-by: Octavia Squidington III <[email protected]>
1 parent 316502c commit 708802d

File tree

15 files changed

+848
-1005
lines changed

15 files changed

+848
-1005
lines changed

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@
762762
- name: Postgres
763763
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
764764
dockerRepository: airbyte/source-postgres
765-
dockerImageTag: 0.4.39
765+
dockerImageTag: 0.4.40
766766
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
767767
icon: postgresql.svg
768768
sourceType: database

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7140,7 +7140,7 @@
71407140
supportsNormalization: false
71417141
supportsDBT: false
71427142
supported_destination_sync_modes: []
7143-
- dockerImage: "airbyte/source-postgres:0.4.39"
7143+
- dockerImage: "airbyte/source-postgres:0.4.40"
71447144
spec:
71457145
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
71467146
connectionSpecification:

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java

+52-6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
*/
3838
public abstract class AbstractJdbcCompatibleSourceOperations<Datatype> implements JdbcCompatibleSourceOperations<Datatype> {
3939

40+
/**
41+
* A Date representing the earliest date in CE. Any date before this is in BCE.
42+
*/
43+
private static final Date ONE_CE = Date.valueOf("0001-01-01");
44+
4045
@Override
4146
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
4247
// the first call communicates with the database. after that the result is cached.
@@ -253,27 +258,68 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
253258
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
254259
}
255260

256-
protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
261+
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
257262
return resultSet.getObject(index, clazz);
258263
}
259264

260265
protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
261-
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
266+
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
262267
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
263268
}
264269

265270
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
266-
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
271+
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
267272
LocalDate localDate = timestamptz.toLocalDate();
268273
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
269274
}
270275

271-
protected String resolveEra(LocalDate date, String value) {
272-
return isBCE(date) ? value.substring(1) + " BC" : value;
276+
/**
277+
* Modifies a string representation of a date/timestamp and normalizes its era indicator.
278+
* Specifically, if this is a BCE value:
279+
* <ul>
280+
* <li>The leading negative sign will be removed if present</li>
281+
* <li>The "BC" suffix will be appended, if not already present</li>
282+
* </ul>
283+
*
284+
* You most likely would prefer to call one of the overloaded methods, which accept temporal types.
285+
*/
286+
public static String resolveEra(boolean isBce, String value) {
287+
String mangledValue = value;
288+
if (isBce) {
289+
if (mangledValue.startsWith("-")) {
290+
mangledValue = mangledValue.substring(1);
291+
}
292+
if (!mangledValue.endsWith(" BC")) {
293+
mangledValue += " BC";
294+
}
295+
}
296+
return mangledValue;
273297
}
274298

275-
public static boolean isBCE(LocalDate date) {
299+
public static boolean isBce(LocalDate date) {
276300
return date.getEra().equals(IsoEra.BCE);
277301
}
278302

303+
public static String resolveEra(LocalDate date, String value) {
304+
return resolveEra(isBce(date), value);
305+
}
306+
307+
/**
308+
* java.sql.Date objects don't properly represent their era (for example, using toLocalDate() always
309+
* returns an object in CE). So to determine the era, we just check whether the date is before 1 AD.
310+
*
311+
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but
312+
* my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out.
313+
*/
314+
public static String resolveEra(Date date, String value) {
315+
return resolveEra(date.before(ONE_CE), value);
316+
}
317+
318+
/**
319+
* See {@link #resolveEra(Date, String)} for explanation.
320+
*/
321+
public static String resolveEra(Timestamp timestamp, String value) {
322+
return resolveEra(timestamp.before(ONE_CE), value);
323+
}
324+
279325
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
8+
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
9+
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
10+
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
11+
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBce;
12+
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra;
13+
14+
import java.sql.Date;
15+
import java.sql.Timestamp;
16+
import java.time.Instant;
17+
import java.time.LocalDate;
18+
import java.time.LocalDateTime;
19+
import java.time.LocalTime;
20+
import java.time.OffsetDateTime;
21+
import java.time.OffsetTime;
22+
import java.time.ZoneOffset;
23+
import java.time.ZonedDateTime;
24+
import java.time.format.DateTimeFormatter;
25+
26+
public class DateTimeConverter {
27+
28+
public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
29+
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");
30+
31+
public static String convertToTimeWithTimezone(Object time) {
32+
OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER);
33+
return timetz.format(TIMETZ_FORMATTER);
34+
}
35+
36+
public static String convertToTimestampWithTimezone(Object timestamp) {
37+
if (timestamp instanceof Timestamp t) {
38+
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
39+
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the
40+
// value for ancient dates, because leap years weren't applied consistently in ye olden days.
41+
// Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't
42+
// rely on their getEra() methods.
43+
// So we have special handling for this case, which sidesteps the toInstant conversion.
44+
ZonedDateTime timestamptz = t.toLocalDateTime().atZone(ZoneOffset.UTC);
45+
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
46+
return resolveEra(t, value);
47+
} else if (timestamp instanceof OffsetDateTime t) {
48+
// In incremental mode, debezium emits java.time.OffsetDateTime objects.
49+
// java.time classes have a year 0, but the standard AD/BC system does not. For example,
50+
// "0001-01-01 BC" is represented as LocalDate("0000-01-01").
51+
// We just subtract one year to hack around this difference.
52+
LocalDate localDate = t.toLocalDate();
53+
if (isBce(localDate)) {
54+
t = t.minusYears(1);
55+
}
56+
return resolveEra(localDate, t.toString());
57+
} else {
58+
// This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird
59+
// situation that I'm not aware of.
60+
Instant instant = Instant.parse(timestamp.toString());
61+
OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
62+
ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime);
63+
LocalDate localDate = timestamptz.toLocalDate();
64+
String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
65+
return resolveEra(localDate, value);
66+
}
67+
}
68+
69+
/**
70+
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
71+
* here.
72+
*/
73+
public static String convertToTimestamp(Object timestamp) {
74+
if (timestamp instanceof Timestamp t) {
75+
// Snapshot mode
76+
LocalDateTime localDateTime = t.toLocalDateTime();
77+
String value = localDateTime.format(TIMESTAMP_FORMATTER);
78+
return resolveEra(t, value);
79+
} else if (timestamp instanceof Instant i) {
80+
// Incremental mode
81+
LocalDate localDate = i.atZone(ZoneOffset.UTC).toLocalDate();
82+
if (isBce(localDate)) {
83+
// i.minus(1, ChronoUnit.YEARS) would be nice, but it throws an exception because you can't subtract
84+
// YEARS from an Instant
85+
i = i.atZone(ZoneOffset.UTC).minusYears(1).toInstant();
86+
}
87+
return resolveEra(localDate, i.toString());
88+
} else {
89+
LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString());
90+
final LocalDate date = localDateTime.toLocalDate();
91+
String value = localDateTime.format(TIMESTAMP_FORMATTER);
92+
return resolveEra(date, value);
93+
}
94+
}
95+
96+
/**
97+
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
98+
* here.
99+
*/
100+
public static Object convertToDate(Object date) {
101+
if (date instanceof Date d) {
102+
// Snapshot mode
103+
LocalDate localDate = ((Date) date).toLocalDate();
104+
return resolveEra(d, localDate.toString());
105+
} else if (date instanceof LocalDate d) {
106+
// Incremental mode
107+
if (isBce(d)) {
108+
d = d.minusYears(1);
109+
}
110+
return resolveEra(d, d.toString());
111+
} else {
112+
LocalDate localDate = LocalDate.parse(date.toString());
113+
return resolveEra(localDate, localDate.toString());
114+
}
115+
}
116+
117+
public static String convertToTime(Object time) {
118+
LocalTime localTime = LocalTime.parse(time.toString());
119+
return localTime.format(TIME_FORMATTER);
120+
}
121+
122+
}

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

+58-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import java.math.BigDecimal;
1010
import java.nio.charset.StandardCharsets;
1111
import java.util.Arrays;
12+
import java.util.Locale;
1213
import java.util.Properties;
14+
import org.apache.commons.codec.binary.Hex;
1315
import org.apache.kafka.connect.data.SchemaBuilder;
1416
import org.postgresql.util.PGInterval;
1517
import org.slf4j.Logger;
@@ -19,12 +21,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
1921

2022
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
2123

22-
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
24+
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
2325
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
2426
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
2527
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
2628
private final String[] TEXT_TYPES =
27-
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
29+
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
30+
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
31+
private final String BYTEA_TYPE = "BYTEA";
2832

2933
@Override
3034
public void configure(final Properties props) {}
@@ -39,9 +43,50 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
3943
registerText(field, registration);
4044
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
4145
registerMoney(field, registration);
46+
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
47+
registerBytea(field, registration);
48+
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
49+
registerNumber(field, registration);
4250
}
4351
}
4452

53+
private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
54+
registration.register(SchemaBuilder.string().optional(), x -> {
55+
if (x == null) {
56+
return DebeziumConverterUtils.convertDefaultValue(field);
57+
}
58+
// Bad solution
59+
// We applied a solution like this for several reasons:
60+
// 1. Regarding #13608, CDC and nor-CDC data output format should be the same.
61+
// 2. In the non-CDC mode 'decimal' and 'numeric' values are put to JSON node as BigDecimal value.
62+
// According to Jackson Object mapper configuration, all trailing zeros are omitted and
63+
// numbers with decimal places are deserialized with exponent. (e.g. 1234567890.1234567 would
64+
// be deserialized as 1.2345678901234567E9).
65+
// 3. In the CDC mode 'decimal' and 'numeric' values are deserialized as a regular number (e.g.
66+
// 1234567890.1234567 would be deserialized as 1234567890.1234567). Numbers without
67+
// decimal places (e.g 1, 24, 354) are represented with trailing zero (e.g 1.0, 24.0, 354.0).
68+
// One of solution to align deserialization for these 2 modes is setting
69+
// DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS as true for ObjectMapper. But this breaks
70+
// deserialization for other data-types.
71+
// A worked solution was to keep deserialization for non-CDC mode as it is and change it for CDC
72+
// one.
73+
// The code below strips trailing zeros for integer numbers and represents number with exponent
74+
// if this number has decimals point.
75+
final double doubleValue = Double.parseDouble(x.toString());
76+
var valueWithTruncatedZero = BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
77+
return valueWithTruncatedZero.contains(".") ? String.valueOf(doubleValue) : valueWithTruncatedZero;
78+
});
79+
}
80+
81+
private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
82+
registration.register(SchemaBuilder.string().optional(), x -> {
83+
if (x == null) {
84+
return DebeziumConverterUtils.convertDefaultValue(field);
85+
}
86+
return "\\x" + Hex.encodeHexString((byte[]) x);
87+
});
88+
}
89+
4590
private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
4691
registration.register(SchemaBuilder.string().optional(), x -> {
4792
if (x == null) {
@@ -57,14 +102,21 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
57102
}
58103

59104
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
105+
final var fieldType = field.typeName();
106+
60107
registration.register(SchemaBuilder.string().optional(), x -> {
61108
if (x == null) {
62109
return DebeziumConverterUtils.convertDefaultValue(field);
63-
} else if (x instanceof PGInterval) {
64-
return convertInterval((PGInterval) x);
65-
} else {
66-
return DebeziumConverterUtils.convertDate(x);
67110
}
111+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
112+
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x);
113+
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone(x);
114+
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp(x);
115+
case "DATE" -> DateTimeConverter.convertToDate(x);
116+
case "TIME" -> DateTimeConverter.convertToTime(x);
117+
case "INTERVAL" -> convertInterval((PGInterval) x);
118+
default -> DebeziumConverterUtils.convertDate(x);
119+
};
68120
});
69121
}
70122

airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res
2222

2323
@Override
2424
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
25-
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
25+
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
2626
final LocalDate date = timestamp.toLocalDate();
2727

2828
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(

airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseTestSourceOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res
2222

2323
@Override
2424
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
25-
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
25+
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
2626
final LocalDate date = timestamp.toLocalDate();
2727

2828
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(

airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.39
19+
LABEL io.airbyte.version=0.4.40
2020
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt

airbyte-integrations/connectors/source-postgres/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION source-postgres
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.4.39
19+
LABEL io.airbyte.version=0.4.40
2020
LABEL io.airbyte.name=airbyte/source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresCdcProperties.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.integrations.source.postgres;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import io.airbyte.integrations.debezium.internals.PostgresConverter;
89
import java.util.Properties;
910

1011
public class PostgresCdcProperties {
@@ -27,7 +28,7 @@ private static Properties commonProperties() {
2728
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
2829

2930
props.setProperty("converters", "datetime");
30-
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.PostgresConverter");
31+
props.setProperty("datetime.type", PostgresConverter.class.getName());
3132
return props;
3233
}
3334

0 commit comments

Comments
 (0)