Skip to content

Commit 0092712

Browse files
subodh1810edgao
andauthored
fix postgres data handling from WAL logs in CDC mode (#15481)
* fix postgres data handling from WAL logs in CDC mode * format * use formatter for dates also (#15485) * format * change test structure * change log to debug Co-authored-by: Edward Gao <[email protected]>
1 parent fdb5eb9 commit 0092712

File tree

13 files changed

+387
-169
lines changed

13 files changed

+387
-169
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/DataTypeUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
import java.time.format.DateTimeFormatter;
1717
import java.util.function.Function;
1818

19+
/**
20+
* TODO : Replace all the DateTime related logic of this class with
21+
* {@link io.airbyte.db.jdbc.DateTimeConverter}
22+
*/
1923
public class DataTypeUtils {
2024

2125
public static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd'T'HH:mm:ss'Z'";
@@ -27,6 +31,7 @@ public class DataTypeUtils {
2731
public static final DateTimeFormatter TIMETZ_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSXXX");
2832
public static final DateTimeFormatter TIMESTAMPTZ_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX");
2933
public static final DateTimeFormatter OFFSETDATETIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSS XXX");
34+
public static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
3035

3136
// wrap SimpleDateFormat in a function because SimpleDateFormat is not threadsafe as a static final.
3237
public static DateFormat getDateFormat() {

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -258,18 +258,19 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
258258
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
259259
}
260260

261-
protected <ObjectType> ObjectType getObject(ResultSet resultSet, int index, Class<ObjectType> clazz) throws SQLException {
261+
protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException {
262262
return resultSet.getObject(index, clazz);
263263
}
264264

265-
protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
266-
OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
265+
protected void putTimeWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
266+
final OffsetTime timetz = getObject(resultSet, index, OffsetTime.class);
267267
node.put(columnName, timetz.format(TIMETZ_FORMATTER));
268268
}
269269

270-
protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
271-
OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
272-
LocalDate localDate = timestamptz.toLocalDate();
270+
protected void putTimestampWithTimezone(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index)
271+
throws SQLException {
272+
final OffsetDateTime timestamptz = getObject(resultSet, index, OffsetDateTime.class);
273+
final LocalDate localDate = timestamptz.toLocalDate();
273274
node.put(columnName, resolveEra(localDate, timestamptz.format(TIMESTAMPTZ_FORMATTER)));
274275
}
275276

@@ -283,7 +284,7 @@ protected void putTimestampWithTimezone(ObjectNode node, String columnName, Resu
283284
*
284285
* You most likely would prefer to call one of the overloaded methods, which accept temporal types.
285286
*/
286-
public static String resolveEra(boolean isBce, String value) {
287+
public static String resolveEra(final boolean isBce, final String value) {
287288
String mangledValue = value;
288289
if (isBce) {
289290
if (mangledValue.startsWith("-")) {
@@ -296,11 +297,11 @@ public static String resolveEra(boolean isBce, String value) {
296297
return mangledValue;
297298
}
298299

299-
public static boolean isBce(LocalDate date) {
300+
public static boolean isBce(final LocalDate date) {
300301
return date.getEra().equals(IsoEra.BCE);
301302
}
302303

303-
public static String resolveEra(LocalDate date, String value) {
304+
public static String resolveEra(final LocalDate date, final String value) {
304305
return resolveEra(isBce(date), value);
305306
}
306307

@@ -311,14 +312,14 @@ public static String resolveEra(LocalDate date, String value) {
311312
* This is technically kind of sketchy due to ancient timestamps being weird (leap years, etc.), but
312313
* my understanding is that {@link #ONE_CE} has the same weirdness, so it cancels out.
313314
*/
314-
public static String resolveEra(Date date, String value) {
315+
public static String resolveEra(final Date date, final String value) {
315316
return resolveEra(date.before(ONE_CE), value);
316317
}
317318

318319
/**
319320
* See {@link #resolveEra(Date, String)} for explanation.
320321
*/
321-
public static String resolveEra(Timestamp timestamp, String value) {
322+
public static String resolveEra(final Timestamp timestamp, final String value) {
322323
return resolveEra(timestamp.before(ONE_CE), value);
323324
}
324325

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.jdbc;
6+
7+
import static io.airbyte.db.DataTypeUtils.DATE_FORMATTER;
8+
import static io.airbyte.db.DataTypeUtils.TIMESTAMPTZ_FORMATTER;
9+
import static io.airbyte.db.DataTypeUtils.TIMESTAMP_FORMATTER;
10+
import static io.airbyte.db.DataTypeUtils.TIMETZ_FORMATTER;
11+
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
12+
import static io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations.resolveEra;
13+
import static java.time.ZoneOffset.UTC;
14+
15+
import java.sql.Date;
16+
import java.sql.Time;
17+
import java.sql.Timestamp;
18+
import java.time.Duration;
19+
import java.time.Instant;
20+
import java.time.LocalDate;
21+
import java.time.LocalDateTime;
22+
import java.time.LocalTime;
23+
import java.time.OffsetDateTime;
24+
import java.time.OffsetTime;
25+
import java.time.ZonedDateTime;
26+
import java.time.format.DateTimeFormatter;
27+
import java.util.concurrent.TimeUnit;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public class DateTimeConverter {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(DateTimeConverter.class);
34+
public static final DateTimeFormatter TIME_WITH_TIMEZONE_FORMATTER = DateTimeFormatter.ofPattern(
35+
"HH:mm:ss[.][SSSSSSSSS][SSSSSSS][SSSSSS][SSSSS][SSSS][SSS][SS][S][''][XXX][XX][X]");
36+
37+
public static String convertToTimeWithTimezone(final Object time) {
38+
if (time instanceof final java.time.OffsetTime timetz) {
39+
return timetz.format(TIMETZ_FORMATTER);
40+
}
41+
final OffsetTime timetz = OffsetTime.parse(time.toString(), TIME_WITH_TIMEZONE_FORMATTER);
42+
return timetz.format(TIMETZ_FORMATTER);
43+
}
44+
45+
public static String convertToTimestampWithTimezone(final Object timestamp) {
46+
if (timestamp instanceof final Timestamp t) {
47+
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
48+
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually mangles the
49+
// value for ancient dates, because leap years weren't applied consistently in ye olden days.
50+
// Additionally, toInstant() (and toLocalDateTime()) actually lose the era indicator, so we can't
51+
// rely on their getEra() methods.
52+
// So we have special handling for this case, which sidesteps the toInstant conversion.
53+
final ZonedDateTime timestamptz = t.toLocalDateTime().atZone(UTC);
54+
final String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
55+
return resolveEra(t, value);
56+
} else if (timestamp instanceof final OffsetDateTime t) {
57+
return resolveEra(t.toLocalDate(), t.format(TIMESTAMPTZ_FORMATTER));
58+
} else if (timestamp instanceof final ZonedDateTime timestamptz) {
59+
return resolveEra(timestamptz.toLocalDate(), timestamptz.format(TIMESTAMPTZ_FORMATTER));
60+
} else {
61+
// This case probably isn't strictly necessary, but I'm leaving it just in case there's some weird
62+
// situation that I'm not aware of.
63+
final Instant instant = Instant.parse(timestamp.toString());
64+
final OffsetDateTime offsetDateTime = OffsetDateTime.ofInstant(instant, UTC);
65+
final ZonedDateTime timestamptz = ZonedDateTime.from(offsetDateTime);
66+
final LocalDate localDate = timestamptz.toLocalDate();
67+
final String value = timestamptz.format(TIMESTAMPTZ_FORMATTER);
68+
return resolveEra(localDate, value);
69+
}
70+
}
71+
72+
/**
73+
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
74+
* here.
75+
*/
76+
public static String convertToTimestamp(final Object timestamp) {
77+
if (timestamp instanceof final Timestamp t) {
78+
// Snapshot mode
79+
final LocalDateTime localDateTime = t.toLocalDateTime();
80+
final String value = localDateTime.format(TIMESTAMP_FORMATTER);
81+
return resolveEra(t, value);
82+
} else if (timestamp instanceof final Instant i) {
83+
// Incremental mode
84+
return resolveEra(i.atZone(UTC).toLocalDate(), i.atOffset(UTC).toLocalDateTime().format(TIMESTAMP_FORMATTER));
85+
} else {
86+
final LocalDateTime localDateTime = LocalDateTime.parse(timestamp.toString());
87+
final LocalDate date = localDateTime.toLocalDate();
88+
final String value = localDateTime.format(TIMESTAMP_FORMATTER);
89+
return resolveEra(date, value);
90+
}
91+
}
92+
93+
/**
94+
* See {@link #convertToTimestampWithTimezone(Object)} for explanation of the weird things happening
95+
* here.
96+
*/
97+
public static String convertToDate(final Object date) {
98+
if (date instanceof final Date d) {
99+
// Snapshot mode
100+
final LocalDate localDate = ((Date) date).toLocalDate();
101+
return resolveEra(d, localDate.format(DATE_FORMATTER));
102+
} else if (date instanceof LocalDate d) {
103+
// Incremental mode
104+
return resolveEra(d, d.format(DATE_FORMATTER));
105+
} else {
106+
final LocalDate localDate = LocalDate.parse(date.toString());
107+
return resolveEra(localDate, localDate.format(DATE_FORMATTER));
108+
}
109+
}
110+
111+
public static String convertToTime(final Object time) {
112+
if (time instanceof final Time sqlTime) {
113+
return sqlTime.toLocalTime().format(TIME_FORMATTER);
114+
} else if (time instanceof final LocalTime localTime) {
115+
return localTime.format(TIME_FORMATTER);
116+
} else if (time instanceof java.time.Duration) {
117+
long value = ((Duration) time).toNanos();
118+
if (value >= 0 && value <= TimeUnit.DAYS.toNanos(1)) {
119+
return LocalTime.ofNanoOfDay(value).format(TIME_FORMATTER);
120+
} else {
121+
final long updatedValue = 0 > value ? Math.abs(value) : TimeUnit.DAYS.toNanos(1);
122+
LOGGER.debug("Time values must use number of milliseconds greater than 0 and less than 86400000000000 but its {}, converting to {} ", value,
123+
updatedValue);
124+
return LocalTime.ofNanoOfDay(updatedValue).format(TIME_FORMATTER);
125+
}
126+
} else {
127+
return LocalTime.parse(time.toString()).format(TIME_FORMATTER);
128+
}
129+
}
130+
131+
}

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DateTimeConverter.java

Lines changed: 0 additions & 122 deletions
This file was deleted.

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/DebeziumConverterUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ private DebeziumConverterUtils() {
2323
throw new UnsupportedOperationException();
2424
}
2525

26+
/**
27+
* TODO : Replace usage of this method with {@link io.airbyte.db.jdbc.DateTimeConverter}
28+
*/
2629
public static String convertDate(final Object input) {
2730
/**
2831
* While building this custom converter we were not sure what type debezium could return cause there

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/PostgresConverter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.debezium.internals;
66

7+
import io.airbyte.db.jdbc.DateTimeConverter;
78
import io.debezium.spi.converter.CustomConverter;
89
import io.debezium.spi.converter.RelationalColumn;
910
import java.math.BigDecimal;
@@ -21,7 +22,7 @@ public class PostgresConverter implements CustomConverter<SchemaBuilder, Relatio
2122

2223
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresConverter.class);
2324

24-
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
25+
private final String[] DATE_TYPES = {"DATE", "TIME", "TIMETZ", "INTERVAL", "TIMESTAMP", "TIMESTAMPTZ"};
2526
private final String[] BIT_TYPES = {"BIT", "VARBIT"};
2627
private final String[] MONEY_ITEM_TYPE = {"MONEY"};
2728
private final String[] GEOMETRICS_TYPES = {"BOX", "CIRCLE", "LINE", "LSEG", "POINT", "POLYGON", "PATH"};
@@ -115,7 +116,7 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
115116
case "DATE" -> DateTimeConverter.convertToDate(x);
116117
case "TIME" -> DateTimeConverter.convertToTime(x);
117118
case "INTERVAL" -> convertInterval((PGInterval) x);
118-
default -> DebeziumConverterUtils.convertDate(x);
119+
default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
119120
};
120121
});
121122
}

airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,14 @@ public void setUpInternal() throws Exception {
110110
localRoot = Files.createTempDirectory(testDir, "output");
111111
environment = new TestDestinationEnv(localRoot);
112112
workerConfigs = new WorkerConfigs(new EnvConfigs());
113-
114-
setupEnvironment(environment);
115-
116113
processFactory = new DockerProcessFactory(
117114
workerConfigs,
118115
workspaceRoot,
119116
workspaceRoot.toString(),
120117
localRoot.toString(),
121118
"host");
119+
120+
setupEnvironment(environment);
122121
}
123122

124123
@AfterEach

0 commit comments

Comments
 (0)