Skip to content

🎉 Updated timestamp transformation with microseconds #10242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7a87acc
updated timestamp transformation with microseconds
andriikorotkov Feb 10, 2022
543ef23
updated timestamp transformation with microseconds
andriikorotkov Feb 11, 2022
2f2e567
updated mysql datatype tests
andriikorotkov Feb 11, 2022
604d7fe
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Feb 11, 2022
936313d
updated mysql datatype tests
andriikorotkov Feb 11, 2022
f359070
updated mysql datatype tests
andriikorotkov Feb 11, 2022
3ffad74
updated mysql datatype tests
andriikorotkov Feb 11, 2022
fa58a5d
updated mysql datatype tests
andriikorotkov Feb 11, 2022
a14ae07
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Feb 15, 2022
bea4a79
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Feb 15, 2022
04a69ac
updated mysql tests
andriikorotkov Feb 16, 2022
a8b6d63
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Feb 16, 2022
7ac7cf2
removed extra logs
andriikorotkov Feb 16, 2022
d624b0c
fixed cursor for timestamp
andriikorotkov Feb 17, 2022
fb455f3
updated test
andriikorotkov Feb 17, 2022
a8a78c8
updated type transformation
andriikorotkov Feb 17, 2022
626a7a3
updated type transformation
andriikorotkov Feb 17, 2022
d8df734
updated type transformation
andriikorotkov Feb 17, 2022
402d4c8
updated tests
andriikorotkov Feb 17, 2022
4674cd2
updated oracle tests
andriikorotkov Feb 18, 2022
8e35755
Merge branch 'master' of github.com:airbytehq/airbyte into akorotkov/…
andriikorotkov Feb 18, 2022
41b5730
updated documentations and connectors versions
andriikorotkov Feb 18, 2022
bb92e27
updated documentations and connectors versions
andriikorotkov Feb 18, 2022
4cab321
fix code style
andriikorotkov Feb 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions airbyte-db/lib/src/main/java/io/airbyte/db/DataTypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataTypeUtils {

Expand All @@ -40,6 +42,11 @@ public static <T> T returnNullIfInvalid(final DataTypeSupplier<T> valueProducer,
}
}

public static String toISO8601StringWithMicroseconds(Instant instant) {
String dateWithMilliseconds = DATE_FORMAT_WITH_MILLISECONDS.format(Date.from(instant));
return dateWithMilliseconds.substring(0, 23) + calculateMicrosecondsString(instant.getNano()) + dateWithMilliseconds.substring(23);
}

public static String toISO8601StringWithMilliseconds(final long epochMillis) {
return DATE_FORMAT_WITH_MILLISECONDS.format(Date.from(Instant.ofEpochMilli(epochMillis)));
}
Expand Down Expand Up @@ -68,4 +75,16 @@ public static String toISO8601String(final Duration duration) {
return DATE_FORMAT.format(Date.from(Instant.ofEpochSecond(Math.abs(duration.getSeconds()), Math.abs(duration.getNano()))));
}

private static String calculateMicrosecondsString(int nano) {
var microSeconds = (nano / 1000) % 1000;
String result;
if (microSeconds < 10) {
result = "00" + microSeconds;
} else if (microSeconds < 100) {
result = "0" + microSeconds;
} else {
result = "" + microSeconds;
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import javax.xml.bind.DatatypeConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Source operation skeleton for JDBC compatible databases.
Expand Down Expand Up @@ -120,9 +123,8 @@ protected void putTime(final ObjectNode node, final String columnName, final Res

protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
// https://www.cis.upenn.edu/~bcpierce/courses/629/jdkdocs/guide/jdbc/getstart/mapping.doc.html
final Timestamp t = resultSet.getTimestamp(index);
final java.util.Date d = new java.util.Date(t.getTime() + (t.getNanos() / 1000000));
node.put(columnName, DataTypeUtils.toISO8601String(d));
final Instant instant = resultSet.getTimestamp(index).toInstant();
node.put(columnName, DataTypeUtils.toISO8601StringWithMicroseconds(instant));
}

protected void putBinary(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
Expand All @@ -143,8 +145,14 @@ protected void setTimestamp(final PreparedStatement preparedStatement, final int
// Parsing TIME as a TIMESTAMP might potentially break for ClickHouse cause it doesn't expect TIME
// value in the following format
try {
preparedStatement.setTimestamp(parameterIndex, Timestamp
.from(DataTypeUtils.DATE_FORMAT.parse(value).toInstant()));
var micro = value.substring(value.lastIndexOf('.') + 1, value.length() - 1);
var nanos = micro + "000";
var valueWithoutMicros = value.replace("." + micro, "");

var timestamp = Timestamp
.from(DataTypeUtils.DATE_FORMAT.parse(valueWithoutMicros).toInstant());
timestamp.setNanos(Integer.parseInt(nanos));
preparedStatement.setTimestamp(parameterIndex, timestamp);
} catch (final ParseException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ void testSetStatementField() throws SQLException {
sourceOperations.setStatementField(ps, 11, JDBCType.CHAR, "a");
sourceOperations.setStatementField(ps, 12, JDBCType.VARCHAR, "a");
sourceOperations.setStatementField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z");
sourceOperations.setStatementField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00Z");
sourceOperations.setStatementField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00Z");
sourceOperations.setStatementField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z");
sourceOperations.setStatementField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z");
sourceOperations.setStatementField(ps, 16, JDBCType.BINARY, "61616161");

ps.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public static String convertDate(final Object input) {
} else if (input instanceof Duration) {
return DataTypeUtils.toISO8601String((Duration) input);
} else if (input instanceof Timestamp) {
return DataTypeUtils.toISO8601String(((Timestamp) input).toLocalDateTime());
return DataTypeUtils.toISO8601StringWithMicroseconds((((Timestamp) input).toInstant()));
} else if (input instanceof Number) {
return DataTypeUtils.toISO8601String(
new Timestamp(((Number) input).longValue()).toLocalDateTime());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("timestamp")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "null")
.addExpectedValues("2004-10-19T10:23:54Z", null)
.addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null")
.addExpectedValues("2004-10-19T10:23:54.000000Z", "2004-10-19T10:23:54.123456Z", null)
.build());

addDataTypeTestData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,9 @@ protected void initTests() {
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'2018-03-22-12.00.00.123'", "'20180322125959'", "'20180101 12:00:59 PM'")
.addExpectedValues(null, "2018-03-22T12:00:00Z", "2018-03-22T12:59:59Z", "2018-01-01T12:00:59Z") // milliseconds values are erased
.addInsertValues("null", "'2018-03-22-12.00.00.123'", "'2018-03-22-12.00.00.123456'", "'20180322125959'", "'20180101 12:00:59 PM'")
.addExpectedValues(null, "2018-03-22T12:00:00.1230Z", "2018-03-22T12:00:00.123456Z", "2018-03-22T12:59:59.0000Z",
"2018-01-01T12:00:59.0000Z")
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,11 +723,11 @@ void testReadMultipleTablesIncrementally() throws Exception {
}

// when initial and final cursor fields are the same.
private void incrementalCursorCheck(
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
protected void incrementalCursorCheck(
final String cursorField,
final String initialCursorValue,
final String endCursorValue,
final List<AirbyteMessage> expectedRecordMessages)
throws Exception {
incrementalCursorCheck(cursorField, cursorField, initialCursorValue, endCursorValue,
expectedRecordMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,34 +340,35 @@ protected void initTests() {
.sourceType("smalldatetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'1900-01-01'", "'2079-06-06'", "null")
.addExpectedValues("1900-01-01T00:00:00Z", "2079-06-06T00:00:00Z", null)
.addExpectedValues("1900-01-01T00:00:00.000000Z", "2079-06-06T00:00:00.000000Z", null)
.createTablePatternSql(CREATE_TABLE_SQL)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'1753-01-01'", "'9999-12-31'", "null")
.addExpectedValues("1753-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null)
.addInsertValues("'1753-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04Z'",
"'9999-12-31T13:00:04.123Z'", "null")
.addExpectedValues("1753-01-01T00:00:00.000000Z", "9999-12-31T00:00:00.000000Z", "9999-12-31T13:00:04.000000Z",
"9999-12-31T13:00:04.123000Z", null)
.createTablePatternSql(CREATE_TABLE_SQL)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime2")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "null")
.addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04.123456Z'", "null")
.addExpectedValues("0001-01-01T00:00:00.000000Z", "9999-12-31T00:00:00.000000Z", "9999-12-31T13:00:04.123456Z", null)
.createTablePatternSql(CREATE_TABLE_SQL)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'00:00:00.0000000'", "'23:59:59.9999999'", "'00:00:00'", "'23:58'",
"null")
.addInsertValues("'00:00:00.0000000'", "'23:59:59.9999999'", "'00:00:00'", "'23:58'", "null")
.addExpectedValues("00:00:00", "23:59:59.9999999", "00:00:00", "23:58:00", null)
.createTablePatternSql(CREATE_TABLE_SQL)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,42 +181,42 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("date")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'1999-01-08'",
"null")
.addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z",
"1999-01-08T00:00:00Z", null)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'1999-01-08'", "null")
.addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z", "1999-01-08T00:00:00Z", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("smalldatetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'1900-01-01'", "'2079-06-06'", "null")
.addExpectedValues("1900-01-01T00:00:00Z", "2079-06-06T00:00:00Z", null)
.addExpectedValues("1900-01-01T00:00:00.000000Z", "2079-06-06T00:00:00.000000Z", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'1753-01-01'", "'9999-12-31'", "null")
.addExpectedValues("1753-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null)
.addInsertValues("'1753-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04Z'",
"'9999-12-31T13:00:04.123Z'", "null")
.addExpectedValues("1753-01-01T00:00:00.000000Z", "9999-12-31T00:00:00.000000Z", "9999-12-31T13:00:04.000000Z",
"9999-12-31T13:00:04.123000Z", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime2")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "null")
.addExpectedValues("0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z", null)
.addInsertValues("'0001-01-01'", "'9999-12-31'", "'9999-12-31T13:00:04.123456Z'", "null")
.addExpectedValues("0001-01-01T00:00:00.000000Z", "9999-12-31T00:00:00.000000Z", "9999-12-31T13:00:04.123456Z", null)
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("time")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'13:00:01'", "'13:00:04Z'")
.addExpectedValues(null, "13:00:01.0000000", "13:00:04.0000000")
.addInsertValues("null", "'13:00:01'", "'13:00:04Z'", "'13:00:04.123456Z'")
.addExpectedValues(null, "13:00:01.0000000", "13:00:04.0000000", "13:00:04.1234560")
.build());

addDataTypeTestData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("date")
.fullSourceDataType("date not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand All @@ -255,15 +255,15 @@ protected void initTests() {
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'2005-10-10 23:22:21'")
.addExpectedValues(null, "2005-10-10T23:22:21Z")
.addExpectedValues(null, "2005-10-10T23:22:21.000000Z")
.build());

// Check Zero-date value for mandatory field
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("datetime")
.fullSourceDataType("datetime not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00 00:00:00'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand All @@ -281,7 +281,7 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("timestamp")
.fullSourceDataType("timestamp not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00 00:00:00.000000'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand All @@ -292,7 +292,7 @@ protected void initTests() {
.airbyteType(JsonSchemaType.STRING)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
// https://debezium.io/documentation/reference/connectors/mysql.html#mysql-temporal-types
.addInsertValues("null", "'-23:59:59.123456'", "'00:00:00'")
.addInsertValues("null", "'-23:59:59'", "'00:00:00'")
.addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z")
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void initTests() {
.sourceType("datetime")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("null", "'2005-10-10 23:22:21'")
.addExpectedValues(null, "2005-10-10T23:22:21Z")
.addExpectedValues(null, "2005-10-10T23:22:21.000000Z")
.build());

addDataTypeTestData(
Expand All @@ -306,7 +306,7 @@ protected void initTests() {
.sourceType("time")
.airbyteType(JsonSchemaType.STRING)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("null", "'-23:59:59.123456'", "'00:00:00'")
.addInsertValues("null", "'-23:59:59'", "'00:00:00'")
.addExpectedValues(null, "1970-01-01T23:59:59Z", "1970-01-01T00:00:00Z")
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,9 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("DATE")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("to_date('-4700/01/01','syyyy/mm/dd')", "to_date('9999/12/31 23:59:59','yyyy/mm/dd hh24:mi:ss')", "null")
.addExpectedValues("4700-01-01T00:00:00Z", "9999-12-31T23:59:59Z", null)
.addInsertValues("to_date('-4700/01/01','syyyy/mm/dd')",
"to_date('9999/12/31 23:59:59','yyyy/mm/dd hh24:mi:ss')", "null")
.addExpectedValues("4700-01-01T00:00:00.000000Z", "9999-12-31T23:59:59.000000Z", null)
// @TODO stream fails when gets Zero date value
// .addInsertValues("'2021/01/00'", "'2021/00/00'", "'0000/00/00'")
.build());
Expand All @@ -192,8 +193,9 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("to_timestamp('2020-06-10 06:14:00.742000000', 'YYYY-MM-DD HH24:MI:SS.FF')")
.addExpectedValues("2020-06-10T06:14:01Z")
.addInsertValues("to_timestamp('2020-06-10 06:14:00.742', 'YYYY-MM-DD HH24:MI:SS.FF')",
"to_timestamp('2020-06-10 06:14:00.742123', 'YYYY-MM-DD HH24:MI:SS.FF')")
.addExpectedValues("2020-06-10T06:14:00.742000Z", "2020-06-10T06:14:00.742123Z")
.build());

addDataTypeTestData(
Expand All @@ -202,29 +204,32 @@ protected void initTests() {
.airbyteType(JsonSchemaType.STRING)
.fullSourceDataType("TIMESTAMP WITH TIME ZONE")
.addInsertValues("to_timestamp_tz('21-FEB-2009 18:00:00 EST', 'DD-MON-YYYY HH24:MI:SS TZR')",
"to_timestamp_tz('21-FEB-2009 18:00:00 -5:00', 'DD-MON-YYYY HH24:MI:SS TZH:TZM')")
.addExpectedValues("2009-02-21 18:00:00.0 EST", "2009-02-21 18:00:00.0 -5:00")
"to_timestamp_tz('21-FEB-2009 18:00:00.123456 EST', 'DD-MON-YYYY HH24:MI:SS.FF TZR')",
"to_timestamp_tz('21-FEB-2009 18:00:00 -5:00', 'DD-MON-YYYY HH24:MI:SS TZH:TZM')",
"to_timestamp_tz('21-FEB-2009 18:00:00.123456 -5:00', 'DD-MON-YYYY HH24:MI:SS.FF TZH:TZM')")
.addExpectedValues("2009-02-21 18:00:00.0 EST", "2009-02-21 18:00:00.123456 EST",
"2009-02-21 18:00:00.0 -5:00", "2009-02-21 18:00:00.123456 -5:00")
.build());

final DateFormat utcFormat = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss");
final DateFormat utcFormat = new SimpleDateFormat("dd-MMM-yyyy HH:mm:ss.SSSSSS");
utcFormat.setTimeZone(TimeZone.getTimeZone(Calendar.getInstance().getTimeZone().getID()));
Date date = null;
try {
date = utcFormat.parse("21-Feb-2009 18:00:00");
date = utcFormat.parse("21-Feb-2009 18:00:00.000456");
} catch (final ParseException e) {
LOGGER.error("Unparseable date");
date = Date.from(Instant.parse("2009-02-21T18:00:00.00Z"));
date = Date.from(Instant.parse("2009-02-21T18:00:00.000456Z"));
}
final DateFormat currentTFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
final DateFormat currentTFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSSS");
currentTFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
final String utc = currentTFormat.format(date);
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING)
.fullSourceDataType("TIMESTAMP WITH LOCAL TIME ZONE")
.addInsertValues("to_timestamp_tz('21-FEB-2009 18:00:00', 'DD-MON-YYYY HH24:MI:SS')")
.addExpectedValues(utc + ".0 UTC")
.addInsertValues("to_timestamp_tz('21-FEB-2009 18:00:00.000456', 'DD-MON-YYYY HH24:MI:SS.FF')")
.addExpectedValues(utc + " UTC")
.build());

addDataTypeTestData(
Expand Down
Loading