Skip to content

Commit 7a7e78f

Browse files
subodh1810edgao
andauthored
fix temporal data type handling in mysql source (#15504)
* fix postgres data handling from WAL logs in CDC mode * format * fix temporal type data handling from in mysql source * use formatter for dates also (#15485) * format * tweak tests * change test structure * add test for data read from binlogs * fix json schema types for data types * handle cursor parsing * add more tests * remove assertion cause its not useful Co-authored-by: Edward Gao <[email protected]>
1 parent 90b71b7 commit 7a7e78f

File tree

10 files changed

+709
-192
lines changed

10 files changed

+709
-192
lines changed

airbyte-integrations/bases/debezium-v1-9-2/src/main/java/io/airbyte/integrations/debezium/internals/MySQLDateTimeConverter.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55
package io.airbyte.integrations.debezium.internals;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8-
import io.airbyte.db.DataTypeUtils;
8+
import io.airbyte.db.jdbc.DateTimeConverter;
99
import io.debezium.spi.converter.CustomConverter;
1010
import io.debezium.spi.converter.RelationalColumn;
11-
import java.time.LocalDate;
1211
import java.util.Arrays;
12+
import java.util.Locale;
1313
import java.util.Properties;
1414
import org.apache.kafka.connect.data.SchemaBuilder;
1515
import org.slf4j.Logger;
@@ -30,7 +30,7 @@ public class MySQLDateTimeConverter implements CustomConverter<SchemaBuilder, Re
3030

3131
private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDateTimeConverter.class);
3232

33-
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME"};
33+
private final String[] DATE_TYPES = {"DATE", "DATETIME", "TIME", "TIMESTAMP"};
3434

3535
@Override
3636
public void configure(final Properties props) {}
@@ -42,18 +42,22 @@ public void converterFor(final RelationalColumn field, final ConverterRegistrati
4242
}
4343
}
4444

45-
/**
46-
* The debezium driver replaces Zero-value by Null even when this column is mandatory. According to
47-
* the doc, it should be done by driver, but it fails.
48-
*/
49-
private Object convertDefaultValueNullDate(final RelationalColumn field) {
50-
final var defaultValue = DebeziumConverterUtils.convertDefaultValue(field);
51-
return (defaultValue == null && !field.isOptional() ? DataTypeUtils.toISO8601String(LocalDate.EPOCH) : defaultValue);
52-
}
53-
5445
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
55-
registration.register(SchemaBuilder.string(),
56-
x -> x == null ? convertDefaultValueNullDate(field) : DebeziumConverterUtils.convertDate(x));
46+
final var fieldType = field.typeName();
47+
48+
registration.register(SchemaBuilder.string().optional(), x -> {
49+
if (x == null) {
50+
return DebeziumConverterUtils.convertDefaultValue(field);
51+
}
52+
53+
return switch (fieldType.toUpperCase(Locale.ROOT)) {
54+
case "DATETIME" -> DateTimeConverter.convertToTimestamp(x);
55+
case "DATE" -> DateTimeConverter.convertToDate(x);
56+
case "TIME" -> DateTimeConverter.convertToTime(x);
57+
case "TIMESTAMP" -> DateTimeConverter.convertToTimestampWithTimezone(x);
58+
default -> throw new IllegalArgumentException("Unknown field type " + fieldType.toUpperCase(Locale.ROOT));
59+
};
60+
});
5761
}
5862

5963
}

airbyte-integrations/connectors/source-mysql/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ dependencies {
2424
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-2'))
2525
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
2626
testImplementation 'org.apache.commons:commons-lang3:3.11'
27+
testImplementation 'org.hamcrest:hamcrest-all:1.3'
2728
testImplementation libs.connectors.testcontainers.mysql
2829

2930
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcProperties.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
public class MySqlCdcProperties {
1111

12-
static Properties getDebeziumProperties(JsonNode config) {
12+
static Properties getDebeziumProperties(final JsonNode config) {
1313
final Properties props = new Properties();
1414

1515
// debezium engine configuration
@@ -26,8 +26,13 @@ static Properties getDebeziumProperties(JsonNode config) {
2626
props.setProperty("datetime.type", "io.airbyte.integrations.debezium.internals.MySQLDateTimeConverter");
2727

2828
// snapshot config
29-
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
30-
props.setProperty("snapshot.mode", "when_needed");
29+
if (config.has("snapshot_mode")) {
30+
//The parameter `snapshot_mode` is passed in test to simulate reading the binlog directly and skip initial snapshot
31+
props.setProperty("snapshot.mode", config.get("snapshot_mode").asText());
32+
} else {
33+
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-mode
34+
props.setProperty("snapshot.mode", "when_needed");
35+
}
3136
// https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-snapshot-locking-mode
3237
// This is to make sure other database clients are allowed to write to a table while Airbyte is
3338
// taking a snapshot. There is a risk involved that

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,17 @@
2020
import io.airbyte.db.DataTypeUtils;
2121
import io.airbyte.db.SourceOperations;
2222
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations;
23+
import io.airbyte.db.jdbc.DateTimeConverter;
2324
import io.airbyte.protocol.models.JsonSchemaType;
2425
import java.sql.PreparedStatement;
2526
import java.sql.ResultSet;
2627
import java.sql.SQLException;
28+
import java.time.LocalDate;
29+
import java.time.LocalDateTime;
30+
import java.time.LocalTime;
31+
import java.time.OffsetDateTime;
32+
import java.time.OffsetTime;
33+
import java.time.format.DateTimeParseException;
2734
import org.slf4j.Logger;
2835
import org.slf4j.LoggerFactory;
2936

@@ -73,7 +80,8 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
7380
case DOUBLE, DOUBLE_UNSIGNED -> putDouble(json, columnName, resultSet, colIndex);
7481
case DECIMAL, DECIMAL_UNSIGNED -> putBigDecimal(json, columnName, resultSet, colIndex);
7582
case DATE -> putDate(json, columnName, resultSet, colIndex);
76-
case DATETIME, TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
83+
case DATETIME -> putTimestamp(json, columnName, resultSet, colIndex);
84+
case TIMESTAMP -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
7785
case TIME -> putTime(json, columnName, resultSet, colIndex);
7886
// The returned year value can either be a java.sql.Short (when yearIsDateType=false)
7987
// or a java.sql.Date with the date set to January 1st, at midnight (when yearIsDateType=true).
@@ -125,7 +133,8 @@ public void setStatementField(final PreparedStatement preparedStatement,
125133
case FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED -> setDouble(preparedStatement, parameterIndex, value);
126134
case DECIMAL, DECIMAL_UNSIGNED -> setDecimal(preparedStatement, parameterIndex, value);
127135
case DATE -> setDate(preparedStatement, parameterIndex, value);
128-
case DATETIME, TIMESTAMP -> setTimestamp(preparedStatement, parameterIndex, value);
136+
case DATETIME -> setTimestamp(preparedStatement, parameterIndex, value);
137+
case TIMESTAMP -> setTimestampWithTimezone(preparedStatement, parameterIndex, value);
129138
case TIME -> setTime(preparedStatement, parameterIndex, value);
130139
case YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET -> setString(preparedStatement, parameterIndex, value);
131140
case TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> setBinary(preparedStatement, parameterIndex, value);
@@ -168,7 +177,22 @@ public MysqlType getFieldType(final JsonNode field) {
168177
}
169178

170179
@Override
171-
public JsonSchemaType getJsonType(MysqlType mysqlType) {
180+
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
181+
node.put(columnName, DateTimeConverter.convertToDate(getObject(resultSet, index, LocalDate.class)));
182+
}
183+
184+
@Override
185+
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
186+
node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class)));
187+
}
188+
189+
@Override
190+
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
191+
node.put(columnName, DateTimeConverter.convertToTimestamp(getObject(resultSet, index, LocalDateTime.class)));
192+
}
193+
194+
@Override
195+
public JsonSchemaType getJsonType(final MysqlType mysqlType) {
172196
return switch (mysqlType) {
173197
case
174198
// TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link
@@ -179,8 +203,54 @@ public JsonSchemaType getJsonType(MysqlType mysqlType) {
179203
case NULL -> JsonSchemaType.NULL;
180204
// BIT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link getFieldType}
181205
case BIT, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY, GEOMETRY -> JsonSchemaType.STRING_BASE_64;
206+
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
207+
case DATETIME -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE;
208+
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE;
209+
case DATE -> JsonSchemaType.STRING_DATE;
182210
default -> JsonSchemaType.STRING;
183211
};
184212
}
185213

214+
@Override
215+
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
216+
try {
217+
preparedStatement.setObject(parameterIndex, LocalDate.parse(value));
218+
} catch (final DateTimeParseException e) {
219+
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
220+
LOGGER.warn("Exception occurred while trying to parse value for date column the new way, trying the old way", e);
221+
super.setDate(preparedStatement, parameterIndex, value);
222+
}
223+
}
224+
225+
@Override
226+
protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException {
227+
try {
228+
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
229+
} catch (final DateTimeParseException e) {
230+
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
231+
LOGGER.warn("Exception occurred while trying to parse value for datetime column the new way, trying the old way", e);
232+
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
233+
}
234+
}
235+
236+
private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
237+
try {
238+
preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value));
239+
} catch (final DateTimeParseException e) {
240+
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
241+
LOGGER.warn("Exception occurred while trying to parse value for timestamp column the new way, trying the old way", e);
242+
preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value));
243+
}
244+
}
245+
246+
@Override
247+
protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
248+
try {
249+
preparedStatement.setObject(parameterIndex, LocalTime.parse(value));
250+
} catch (final DateTimeParseException e) {
251+
LOGGER.warn("Exception occurred while trying to parse value for time column the new way, trying the old way", e);
252+
// This is just for backward compatibility for connectors created on versions before PR https://github.com/airbytehq/airbyte/pull/15504
253+
super.setTime(preparedStatement, parameterIndex, value);
254+
}
255+
}
186256
}

airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/AbstractMySqlSourceDatatypeTest.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -235,34 +235,28 @@ protected void initTests() {
235235
addDataTypeTestData(
236236
TestDataHolder.builder()
237237
.sourceType("date")
238-
.airbyteType(JsonSchemaType.STRING)
238+
.airbyteType(JsonSchemaType.STRING_DATE)
239239
.addInsertValues("null", "'2021-01-01'")
240-
.addExpectedValues(null, "2021-01-01T00:00:00Z")
240+
.addExpectedValues(null, "2021-01-01")
241241
.build());
242242

243243
addDataTypeTestData(
244244
TestDataHolder.builder()
245245
.sourceType("datetime")
246-
.airbyteType(JsonSchemaType.STRING)
247-
.addInsertValues("null", "'2005-10-10 23:22:21'")
248-
.addExpectedValues(null, "2005-10-10T23:22:21.000000Z")
246+
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
247+
.addInsertValues("null", "'2005-10-10 23:22:21'", "'2013-09-05T10:10:02'", "'2013-09-06T10:10:02'")
248+
.addExpectedValues(null, "2005-10-10T23:22:21.000000", "2013-09-05T10:10:02.000000", "2013-09-06T10:10:02.000000")
249249
.build());
250250

251-
addDataTypeTestData(
252-
TestDataHolder.builder()
253-
.sourceType("timestamp")
254-
.airbyteType(JsonSchemaType.STRING)
255-
.addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'")
256-
.addExpectedValues(null, null, null, null)
257-
.build());
251+
addTimestampDataTypeTest();
258252

259253
addDataTypeTestData(
260254
TestDataHolder.builder()
261255
.sourceType("time")
262-
.airbyteType(JsonSchemaType.STRING)
256+
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
263257
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
264-
.addInsertValues("null", "'-23:59:59'", "'00:00:00'")
265-
.addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z")
258+
.addInsertValues("null", "'-22:59:59'", "'23:59:59'", "'00:00:00'")
259+
.addExpectedValues(null, "22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
266260
.build());
267261

268262
addDataTypeTestData(
@@ -384,13 +378,7 @@ protected void initTests() {
384378
.addExpectedValues(StringUtils.leftPad("0", 1048000, "0"), "test")
385379
.build());
386380

387-
addDataTypeTestData(
388-
TestDataHolder.builder()
389-
.sourceType("json")
390-
.airbyteType(JsonSchemaType.STRING)
391-
.addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'")
392-
.addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}")
393-
.build());
381+
addJsonDataTypeTest();
394382

395383
addDataTypeTestData(
396384
TestDataHolder.builder()
@@ -412,6 +400,26 @@ protected void initTests() {
412400

413401
}
414402

403+
protected void addJsonDataTypeTest() {
404+
addDataTypeTestData(
405+
TestDataHolder.builder()
406+
.sourceType("json")
407+
.airbyteType(JsonSchemaType.STRING)
408+
.addInsertValues("null", "'{\"a\": 10, \"b\": 15}'", "'{\"fóo\": \"bär\"}'", "'{\"春江潮水连海平\": \"海上明月共潮生\"}'")
409+
.addExpectedValues(null, "{\"a\": 10, \"b\": 15}", "{\"fóo\": \"bär\"}", "{\"春江潮水连海平\": \"海上明月共潮生\"}")
410+
.build());
411+
}
412+
413+
protected void addTimestampDataTypeTest() {
414+
addDataTypeTestData(
415+
TestDataHolder.builder()
416+
.sourceType("timestamp")
417+
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
418+
.addInsertValues("null", "'2021-01-00'", "'2021-00-00'", "'0000-00-00'", "'2022-08-09T10:17:16.161342Z'")
419+
.addExpectedValues(null, null, null, null, "2022-08-09T10:17:16.000000Z")
420+
.build());
421+
}
422+
415423
private String getLogString(final int length) {
416424
final int maxLpadLength = 262144;
417425
final StringBuilder stringBuilder = new StringBuilder("concat(");

0 commit comments

Comments
 (0)