Skip to content

Commit 0a6b99a

Browse files
committed
13608 & 12026 - align regular and CDC integration tests and data mappers
1 parent 743e6c2 commit 0a6b99a

File tree

8 files changed

+710
-1004
lines changed

8 files changed

+710
-1004
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
253253
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
254254
}
255255

256-
protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
256+
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
257257
return resultSet.getObject(index, clazz);
258258
}
259259

260260
protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
261-
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
261+
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
262262
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
263263
}
264264

265265
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
266-
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
266+
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
267267
LocalDate localDate = timestamptz.toLocalDate();
268268
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
269269
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.debezium.internals;
6+
7+
import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
8+
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
9+
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
10+
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
11+
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE;
12+
13+
import java.sql.Date;
14+
import java.sql.Timestamp;
15+
import java.time.LocalDate;
16+
import java.time.LocalDateTime;
17+
import java.time.LocalTime;
18+
import java.time.OffsetDateTime;
19+
import java.time.OffsetTime;
20+
import java.time.ZoneOffset;
21+
import java.time.format.DateTimeFormatter;
22+
23+
public class DateTimeConverter {
24+
25+
public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
26+
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");
27+
28+
public static String convertToTimeWithTimezone(String time) {
29+
OffsetTime timetz = OffsetTime.parse(time, TIME_WITH_TIMEZONE_FORMATTER);
30+
return timetz.format(TIMETZ_FORMATTER);
31+
}
32+
33+
public static String convertToTimestampWithTimezone(Timestamp timestamp) {
34+
OffsetDateTime timestamptz = OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneOffset.UTC);
35+
LocalDate localDate = timestamptz.toLocalDate();
36+
return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER));
37+
}
38+
39+
public static String convertToTimestamp(Timestamp timestamp) {
40+
final LocalDateTime localDateTime = LocalDateTime.ofInstant(timestamp.toInstant(),
41+
ZoneOffset.UTC);
42+
final LocalDate date = localDateTime.toLocalDate();
43+
return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER));
44+
}
45+
46+
public static Object convertToDate(Date date) {
47+
LocalDate localDate = date.toLocalDate();
48+
return resolveEra(localDate, localDate.toString());
49+
}
50+
51+
public static String convertToTime(String time) {
52+
LocalTime localTime = LocalTime.parse(time);
53+
return localTime.format(TIME_FORMATTER);
54+
}
55+
56+
public static String resolveEra(LocalDate date, String value) {
57+
return isBCE(date) ? value.substring(1) + " BC" : value;
58+
}
59+
60+
}

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

+66-12
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,26 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7+
import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE;
8+
import static io.airbyte.protocol.models.JsonSchemaType.DATE;
9+
import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME;
10+
import static io.airbyte.protocol.models.JsonSchemaType.FORMAT;
11+
import static io.airbyte.protocol.models.JsonSchemaType.TIME;
12+
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE;
13+
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE;
14+
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE;
15+
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE;
16+
717
import io.debezium.spi.converter.CustomConverter;
818
import io.debezium.spi.converter.RelationalColumn;
919
import java.math.BigDecimal;
1020
import java.nio.charset.StandardCharsets;
21+
import java.sql.Date;
22+
import java.sql.Timestamp;
1123
import java.util.Arrays;
24+
import java.util.Locale;
1225
import java.util.Properties;
26+
import org.apache.commons.codec.binary.Hex;
1327
import org.apache.kafka.connect.data.SchemaBuilder;
1428
import org.postgresql.util.PGInterval;
1529
import org.slf4j.Logger;
@@ -19,12 +33,14 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
1933

2034
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
2135

22-
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
36+
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
2337
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
2438
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
2539
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
2640
private final String[] TEXT_TYPES =
27-
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
41+
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
42+
private final String[] NUMERIC_TYPES = {"NUMERIC", "DECIMAL"};
43+
private final String BYTEA_TYPE = "BYTEA";
2844

2945
@Override
3046
public void configure(final Properties props) {}
@@ -39,9 +55,31 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
3955
registerText(field, registration);
4056
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
4157
registerMoney(field, registration);
58+
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
59+
registerBytea(field, registration);
60+
} else if (Arrays.stream(NUMERIC_TYPES).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
61+
registerNumber(field, registration);
4262
}
4363
}
4464

65+
private void registerNumber(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
66+
registration.register(SchemaBuilder.float64().optional(), x -> {
67+
if (x == null) {
68+
return DebeziumConverterUtils.convertDefaultValue(field);
69+
}
70+
return new BigDecimal(x.toString()).doubleValue();
71+
});
72+
}
73+
74+
private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
75+
registration.register(SchemaBuilder.string().optional(), x -> {
76+
if (x == null) {
77+
return DebeziumConverterUtils.convertDefaultValue(field);
78+
}
79+
return "\\x" + Hex.encodeHexString((byte[]) x);
80+
});
81+
}
82+
4583
private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
4684
registration.register(SchemaBuilder.string().optional(), x -> {
4785
if (x == null) {
@@ -57,14 +95,23 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
5795
}
5896

5997
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
60-
registration.register(SchemaBuilder.string().optional(), x -> {
98+
final var fieldType = field.typeName();
99+
100+
registration.register(getJsonSchema(fieldType).optional(), x -> {
61101
if (x == null) {
62102
return DebeziumConverterUtils.convertDefaultValue(field);
63-
} else if (x instanceof PGInterval) {
64-
return convertInterval((PGInterval) x);
65-
} else {
66-
return DebeziumConverterUtils.convertDate(x);
67103
}
104+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
105+
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x.toString());
106+
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone((Timestamp) x);
107+
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp((Timestamp) x);
108+
// Debezium doesn't handle era indicators
109+
// https://github.com/airbytehq/airbyte/issues/14590
110+
case "DATE" -> DateTimeConverter.convertToDate((Date) x);
111+
case "TIME" -> DateTimeConverter.convertToTime(x.toString());
112+
case "INTERVAL" -> convertInterval((PGInterval) x);
113+
default -> DebeziumConverterUtils.convertDate(x);
114+
};
68115
});
69116
}
70117

@@ -84,11 +131,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra
84131
return DebeziumConverterUtils.convertDefaultValue(field);
85132
} else if (x instanceof Double) {
86133
final BigDecimal result = BigDecimal.valueOf((Double) x);
87-
if (result.compareTo(new BigDecimal("999999999999999")) == 1
88-
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
89-
return null;
90-
}
91-
return result.toString();
134+
return Double.toString(result.doubleValue());
92135
} else {
93136
return x.toString();
94137
}
@@ -131,4 +174,15 @@ private boolean isNegativeTime(final PGInterval pgInterval) {
131174
|| pgInterval.getWholeSeconds() < 0;
132175
}
133176

177+
private SchemaBuilder getJsonSchema(final String fieldType) {
178+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
179+
case "TIMETZ" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITH_TIMEZONE);
180+
case "TIMESTAMPTZ" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITH_TIMEZONE);
181+
case "TIMESTAMP" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITHOUT_TIMEZONE);
182+
case "DATE" -> SchemaBuilder.string().parameter(FORMAT, DATE);
183+
case "TIME" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITHOUT_TIMEZONE);
184+
default -> SchemaBuilder.string();
185+
};
186+
}
187+
134188
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

+41-4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.airbyte.db.jdbc.JdbcSourceOperations;
2424
import io.airbyte.protocol.models.JsonSchemaType;
2525
import java.math.BigDecimal;
26+
import java.math.RoundingMode;
2627
import java.sql.JDBCType;
2728
import java.sql.PreparedStatement;
2829
import java.sql.ResultSet;
@@ -35,7 +36,15 @@
3536
import java.time.OffsetTime;
3637
import java.time.format.DateTimeParseException;
3738
import java.util.Collections;
39+
import org.postgresql.geometric.PGbox;
40+
import org.postgresql.geometric.PGcircle;
41+
import org.postgresql.geometric.PGline;
42+
import org.postgresql.geometric.PGlseg;
43+
import org.postgresql.geometric.PGpath;
44+
import org.postgresql.geometric.PGpoint;
45+
import org.postgresql.geometric.PGpolygon;
3846
import org.postgresql.jdbc.PgResultSetMetaData;
47+
import org.postgresql.util.PGobject;
3948
import org.slf4j.Logger;
4049
import org.slf4j.LoggerFactory;
4150

@@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
173182
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
174183
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
175184
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
185+
case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class);
186+
case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class);
187+
case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex);
188+
case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class);
189+
case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class);
190+
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
191+
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
192+
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
176193
default -> {
177194
switch (columnType) {
178195
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
@@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
198215

199216
@Override
200217
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
201-
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
218+
final LocalDate date = getObject(resultSet, index, LocalDate.class);
202219
node.put(columnName, resolveEra(date, date.toString()));
203220
}
204221

205222
@Override
206223
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
207-
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
224+
final LocalTime time = getObject(resultSet, index, LocalTime.class);
208225
node.put(columnName, time.format(TIME_FORMATTER));
209226
}
210227

211228
@Override
212229
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
213-
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
230+
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
214231
final LocalDate date = timestamp.toLocalDate();
215232
node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER)));
216233
}
@@ -261,10 +278,30 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
261278
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
262279
}
263280

281+
protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
282+
throws SQLException {
283+
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
284+
if (bigDecimal != null) {
285+
node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue());
286+
} else {
287+
node.put(columnName, (BigDecimal) null);
288+
}
289+
}
290+
291+
protected <T extends PGobject> void putObject(final ObjectNode node,
292+
final String columnName,
293+
final ResultSet resultSet,
294+
final int index,
295+
Class<T> clazz)
296+
throws SQLException {
297+
final T object = getObject(resultSet, index, clazz);
298+
node.put(columnName, object.getValue());
299+
}
300+
264301
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
265302
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
266303
if (bigDecimal != null) {
267-
node.put(columnName, bigDecimal);
304+
node.put(columnName, bigDecimal.doubleValue());
268305
} else {
269306
// Special values (Infinity, -Infinity, and NaN) is default to null for now.
270307
// https://github.com/airbytehq/airbyte/issues/8902

0 commit comments

Comments
 (0)