Skip to content

Commit 2014504

Browse files
committed
13608 & 12026 - align regular and CDC integration tests and data mappers
1 parent 806c939 commit 2014504

File tree

7 files changed

+685
-999
lines changed

7 files changed

+685
-999
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -253,17 +253,17 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
253253
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
254254
}
255255

256-
protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
256+
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
257257
return resultSet.getObject(index, clazz);
258258
}
259259

260260
protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
261-
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
261+
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
262262
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
263263
}
264264

265265
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
266-
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
266+
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
267267
LocalDate localDate = timestamptz.toLocalDate();
268268
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
269269
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package io.airbyte.integrations.debezium.internals;
2+
3+
import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
4+
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
5+
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
6+
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
7+
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.isBCE;
8+
9+
import java.sql.Date;
10+
import java.sql.Timestamp;
11+
import java.time.LocalDate;
12+
import java.time.LocalDateTime;
13+
import java.time.LocalTime;
14+
import java.time.OffsetDateTime;
15+
import java.time.OffsetTime;
16+
import java.time.ZoneOffset;
17+
import java.time.format.DateTimeFormatter;
18+
19+
public class DateTimeConverter {
20+
21+
public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
22+
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");
23+
24+
public static String convertToTimeWithTimezone(String time) {
25+
OffsetTime timetz = OffsetTime.parse(time, TIME_WITH_TIMEZONE_FORMATTER);
26+
return timetz.format(TIMETZ_FORMATTER);
27+
}
28+
29+
public static String convertToTimestampWithTimezone(Timestamp timestamp) {
30+
OffsetDateTime timestamptz = OffsetDateTime.ofInstant(timestamp.toInstant(), ZoneOffset.UTC);
31+
LocalDate localDate = timestamptz.toLocalDate();
32+
return resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER));
33+
}
34+
35+
public static String convertToTimestamp(Timestamp timestamp) {
36+
final LocalDateTime localDateTime = LocalDateTime.ofInstant(timestamp.toInstant(),
37+
ZoneOffset.UTC);
38+
final LocalDate date = localDateTime.toLocalDate();
39+
return resolveEra(date, localDateTime.format(TIMESTAMP_FORMATTER));
40+
}
41+
42+
public static Object convertToDate(Date date) {
43+
LocalDate localDate = date.toLocalDate();
44+
return resolveEra(localDate, localDate.toString());
45+
}
46+
47+
public static String convertToTime(String time) {
48+
LocalTime localTime = LocalTime.parse(time);
49+
return localTime.format(TIME_FORMATTER);
50+
}
51+
52+
public static String resolveEra(LocalDate date, String value) {
53+
return isBCE(date) ? value.substring(1) + " BC" : value;
54+
}
55+
56+
}

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

+52-12
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,26 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7+
import static io.airbyte.protocol.models.JsonSchemaType.AIRBYTE_TYPE;
8+
import static io.airbyte.protocol.models.JsonSchemaType.DATE;
9+
import static io.airbyte.protocol.models.JsonSchemaType.DATE_TIME;
10+
import static io.airbyte.protocol.models.JsonSchemaType.FORMAT;
11+
import static io.airbyte.protocol.models.JsonSchemaType.TIME;
12+
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE;
13+
import static io.airbyte.protocol.models.JsonSchemaType.TIMESTAMP_WITH_TIMEZONE;
14+
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITHOUT_TIMEZONE;
15+
import static io.airbyte.protocol.models.JsonSchemaType.TIME_WITH_TIMEZONE;
16+
717
import io.debezium.spi.converter.CustomConverter;
818
import io.debezium.spi.converter.RelationalColumn;
919
import java.math.BigDecimal;
1020
import java.nio.charset.StandardCharsets;
21+
import java.sql.Date;
22+
import java.sql.Timestamp;
1123
import java.util.Arrays;
24+
import java.util.Locale;
1225
import java.util.Properties;
26+
import org.apache.commons.codec.binary.Hex;
1327
import org.apache.kafka.connect.data.SchemaBuilder;
1428
import org.postgresql.util.PGInterval;
1529
import org.slf4j.Logger;
@@ -19,12 +33,13 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
1933

2034
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
2135

22-
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP"};
36+
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
2337
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
2438
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
2539
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
2640
private final String[] TEXT_TYPES =
27-
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY"};
41+
{"VARCHAR", "VARBINARY", "BLOB", "TEXT", "LONGTEXT", "TINYTEXT", "MEDIUMTEXT", "INVENTORY_ITEM", "TSVECTOR", "TSQUERY", "PG_LSN"};
42+
private final String BYTEA_TYPE = "BYTEA";
2843

2944
@Override
3045
public void configure(final Properties props) {}
@@ -39,9 +54,20 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
3954
registerText(field, registration);
4055
} else if (Arrays.stream(MONEY_ITEM_TYPE).anyMatch(s -> s.equalsIgnoreCase(field.typeName()))) {
4156
registerMoney(field, registration);
57+
} else if (BYTEA_TYPE.equalsIgnoreCase(field.typeName())) {
58+
registerBytea(field, registration);
4259
}
4360
}
4461

62+
private void registerBytea(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
63+
registration.register(SchemaBuilder.string().optional(), x -> {
64+
if (x == null) {
65+
return DebeziumConverterUtils.convertDefaultValue(field);
66+
}
67+
return "\\x" + Hex.encodeHexString((byte[]) x);
68+
});
69+
}
70+
4571
private void registerText(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
4672
registration.register(SchemaBuilder.string().optional(), x -> {
4773
if (x == null) {
@@ -57,14 +83,21 @@ private void registerText(final RelationalColumn field, final ConverterRegistrat
5783
}
5884

5985
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
60-
registration.register(SchemaBuilder.string().optional(), x -> {
86+
final var fieldType = field.typeName();
87+
88+
registration.register(getJsonSchema(fieldType).optional(), x -> {
6189
if (x == null) {
6290
return DebeziumConverterUtils.convertDefaultValue(field);
63-
} else if (x instanceof PGInterval) {
64-
return convertInterval((PGInterval) x);
65-
} else {
66-
return DebeziumConverterUtils.convertDate(x);
6791
}
92+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
93+
case "TIMETZ" -> DateTimeConverter.convertToTimeWithTimezone(x.toString());
94+
case "TIMESTAMPTZ" -> DateTimeConverter.convertToTimestampWithTimezone((Timestamp) x);
95+
case "TIMESTAMP" -> DateTimeConverter.convertToTimestamp((Timestamp) x);
96+
case "DATE" -> DateTimeConverter.convertToDate((Date) x);
97+
case "TIME" -> DateTimeConverter.convertToTime(x.toString());
98+
case "INTERVAL" -> convertInterval((PGInterval) x);
99+
default -> DebeziumConverterUtils.convertDate(x);
100+
};
68101
});
69102
}
70103

@@ -84,11 +117,7 @@ private void registerMoney(final RelationalColumn field, final ConverterRegistra
84117
return DebeziumConverterUtils.convertDefaultValue(field);
85118
} else if (x instanceof Double) {
86119
final BigDecimal result = BigDecimal.valueOf((Double) x);
87-
if (result.compareTo(new BigDecimal("999999999999999")) == 1
88-
|| result.compareTo(new BigDecimal("-999999999999999")) == -1) {
89-
return null;
90-
}
91-
return result.toString();
120+
return Double.toString(result.doubleValue());
92121
} else {
93122
return x.toString();
94123
}
@@ -131,4 +160,15 @@ private boolean isNegativeTime(final PGInterval pgInterval) {
131160
|| pgInterval.getWholeSeconds() < 0;
132161
}
133162

163+
private SchemaBuilder getJsonSchema(final String fieldType) {
164+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
165+
case "TIMETZ" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITH_TIMEZONE);
166+
case "TIMESTAMPTZ" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITH_TIMEZONE);
167+
case "TIMESTAMP" -> SchemaBuilder.string().parameter(FORMAT, DATE_TIME).parameter(AIRBYTE_TYPE, TIMESTAMP_WITHOUT_TIMEZONE);
168+
case "DATE" -> SchemaBuilder.string().parameter(FORMAT, DATE);
169+
case "TIME" -> SchemaBuilder.string().parameter(FORMAT, TIME).parameter(AIRBYTE_TYPE, TIME_WITHOUT_TIMEZONE);
170+
default -> SchemaBuilder.string();
171+
};
172+
}
173+
134174
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java

+48-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.airbyte.db.jdbc.JdbcSourceOperations;
2424
import io.airbyte.protocol.models.JsonSchemaType;
2525
import java.math.BigDecimal;
26+
import java.math.RoundingMode;
2627
import java.sql.JDBCType;
2728
import java.sql.PreparedStatement;
2829
import java.sql.ResultSet;
@@ -35,7 +36,15 @@
3536
import java.time.OffsetTime;
3637
import java.time.format.DateTimeParseException;
3738
import java.util.Collections;
39+
import org.postgresql.geometric.PGbox;
40+
import org.postgresql.geometric.PGcircle;
41+
import org.postgresql.geometric.PGline;
42+
import org.postgresql.geometric.PGlseg;
43+
import org.postgresql.geometric.PGpath;
44+
import org.postgresql.geometric.PGpoint;
45+
import org.postgresql.geometric.PGpolygon;
3846
import org.postgresql.jdbc.PgResultSetMetaData;
47+
import org.postgresql.util.PGobject;
3948
import org.slf4j.Logger;
4049
import org.slf4j.LoggerFactory;
4150

@@ -173,6 +182,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
173182
case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
174183
case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
175184
case "hstore" -> putHstoreAsJson(json, columnName, resultSet, colIndex);
185+
case "circle" -> putObject(json, columnName, resultSet, colIndex, PGcircle.class);
186+
case "box" -> putObject(json, columnName, resultSet, colIndex, PGbox.class);
187+
case "double precision", "float", "float8" -> putFloat8(json, columnName, resultSet, colIndex);
188+
case "line" -> putObject(json, columnName, resultSet, colIndex, PGline.class);
189+
case "lseg" -> putObject(json, columnName, resultSet, colIndex, PGlseg.class);
190+
case "path" -> putObject(json, columnName, resultSet, colIndex, PGpath.class);
191+
case "point" -> putObject(json, columnName, resultSet, colIndex, PGpoint.class);
192+
case "polygon" -> putObject(json, columnName, resultSet, colIndex, PGpolygon.class);
176193
default -> {
177194
switch (columnType) {
178195
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
@@ -198,19 +215,19 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
198215

199216
@Override
200217
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
201-
final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
218+
final LocalDate date = getObject(resultSet, index, LocalDate.class);
202219
node.put(columnName, resolveEra(date, date.toString()));
203220
}
204221

205222
@Override
206223
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
207-
final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
224+
final LocalTime time = getObject(resultSet, index, LocalTime.class);
208225
node.put(columnName, time.format(TIME_FORMATTER));
209226
}
210227

211228
@Override
212229
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
213-
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
230+
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
214231
final LocalDate date = timestamp.toLocalDate();
215232
node.put(columnName, resolveEra(date, timestamp.format(TIMESTAMP_FORMATTER)));
216233
}
@@ -261,6 +278,34 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
261278
node.put(columnName, resultSet.getString(index).equalsIgnoreCase("t"));
262279
}
263280

281+
protected void putFloat8(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
282+
throws SQLException {
283+
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
284+
if (bigDecimal != null) {
285+
node.put(columnName, bigDecimal.setScale(resultSet.getMetaData().getScale(index), RoundingMode.HALF_EVEN).doubleValue());
286+
return;
287+
}
288+
289+
// ResultSet#getBigDecimal cannot handle Infinity, -Infinity, or NaN, and will throw exception,
290+
// which becomes null. So we need to check these special values as string.
291+
final String value = resultSet.getString(index);
292+
if (value.equalsIgnoreCase("infinity")) {
293+
node.put(columnName, Double.POSITIVE_INFINITY);
294+
} else if (value.equalsIgnoreCase("-infinity")) {
295+
node.put(columnName, Double.NEGATIVE_INFINITY);
296+
} else if (value.equalsIgnoreCase("nan")) {
297+
node.put(columnName, Double.NaN);
298+
} else {
299+
node.put(columnName, (BigDecimal) null);
300+
}
301+
}
302+
303+
protected <T extends PGobject> void putObject(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index, Class<T> clazz)
304+
throws SQLException {
305+
final T object = getObject(resultSet, index, clazz);
306+
node.put(columnName, object.getValue());
307+
}
308+
264309
protected void putBigDecimal(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) {
265310
final BigDecimal bigDecimal = DataTypeUtils.returnNullIfInvalid(() -> resultSet.getBigDecimal(index));
266311
if (bigDecimal != null) {

0 commit comments

Comments
 (0)