-
Notifications
You must be signed in to change notification settings - Fork 4.6k
Postgres Source: add timezone awareness and handle BC dates #13166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
198ad45
d836b51
4018d8a
72f4ce7
662c4a3
6dbcc1d
abf903f
8e17ade
ac68e40
fc9f986
1153ee4
f80ff75
2da97cf
68e1c62
359f0c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,10 +18,15 @@ | |
import io.airbyte.db.jdbc.JdbcSourceOperations; | ||
import io.airbyte.protocol.models.JsonSchemaType; | ||
import java.math.BigDecimal; | ||
import java.sql.Date; | ||
import java.sql.JDBCType; | ||
import java.sql.PreparedStatement; | ||
import java.sql.ResultSet; | ||
import java.sql.ResultSetMetaData; | ||
import java.sql.SQLException; | ||
import java.time.LocalDate; | ||
import java.time.LocalDateTime; | ||
import java.time.LocalTime; | ||
import java.util.Collections; | ||
import org.postgresql.jdbc.PgResultSetMetaData; | ||
import org.slf4j.Logger; | ||
|
@@ -67,58 +72,87 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { | |
return jsonNode; | ||
} | ||
|
||
@Override | ||
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { | ||
try { | ||
Date date = Date.valueOf(value); | ||
preparedStatement.setDate(parameterIndex, date); | ||
} catch (final Exception e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { | ||
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); | ||
final String columnName = metadata.getColumnName(colIndex); | ||
final String columnTypeName = metadata.getColumnTypeName(colIndex); | ||
final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase(); | ||
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex)); | ||
|
||
if (resultSet.getString(colIndex) == null) { | ||
json.putNull(columnName); | ||
} else if (columnTypeName.equalsIgnoreCase("bool") || columnTypeName.equalsIgnoreCase("boolean")) { | ||
putBoolean(json, columnName, resultSet, colIndex); | ||
} else if (columnTypeName.equalsIgnoreCase("bytea")) { | ||
putString(json, columnName, resultSet, colIndex); | ||
} else if (columnTypeName.equalsIgnoreCase("time") || columnTypeName.equalsIgnoreCase("timetz")) { | ||
putString(json, columnName, resultSet, colIndex); | ||
} else { | ||
// https://www.postgresql.org/docs/14/datatype.html | ||
switch (columnType) { | ||
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); | ||
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); | ||
case INTEGER -> putInteger(json, columnName, resultSet, colIndex); | ||
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); | ||
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex); | ||
case REAL -> putFloat(json, columnName, resultSet, colIndex); | ||
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); | ||
// BIT is a bit string in Postgres, e.g. '0100' | ||
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); | ||
case DATE -> putDate(json, columnName, resultSet, colIndex); | ||
case TIME -> putTime(json, columnName, resultSet, colIndex); | ||
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); | ||
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); | ||
case ARRAY -> putArray(json, columnName, resultSet, colIndex); | ||
default -> putDefault(json, columnName, resultSet, colIndex); | ||
switch (columnTypeName) { | ||
case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex); | ||
case "bytea" -> putString(json, columnName, resultSet, colIndex); | ||
case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); | ||
case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); | ||
default -> { | ||
switch (columnType) { | ||
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); | ||
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); | ||
case INTEGER -> putInteger(json, columnName, resultSet, colIndex); | ||
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); | ||
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex); | ||
case REAL -> putFloat(json, columnName, resultSet, colIndex); | ||
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); | ||
// BIT is a bit string in Postgres, e.g. '0100' | ||
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); | ||
case DATE -> putDate(json, columnName, resultSet, colIndex); | ||
case TIME -> putTime(json, columnName, resultSet, colIndex); | ||
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); | ||
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); | ||
case ARRAY -> putArray(json, columnName, resultSet, colIndex); | ||
default -> putDefault(json, columnName, resultSet, colIndex); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { | ||
LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); | ||
node.put(columnName, resolveEra(date, date.toString())); | ||
} | ||
|
||
@Override | ||
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { | ||
LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); | ||
node.put(columnName, time.toString()); | ||
} | ||
|
||
@Override | ||
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { | ||
LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); | ||
LocalDate date = timestamp.toLocalDate(); | ||
node.put(columnName, resolveEra(date, timestamp.toString())); | ||
} | ||
|
||
@Override | ||
public JDBCType getFieldType(final JsonNode field) { | ||
try { | ||
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText(); | ||
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase(); | ||
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN | ||
if (typeName.equalsIgnoreCase("bool") || typeName.equalsIgnoreCase("boolean")) { | ||
return JDBCType.BOOLEAN; | ||
} else if (typeName.equalsIgnoreCase("bytea")) { | ||
return switch (typeName) { | ||
case "bool", "boolean" -> JDBCType.BOOLEAN; | ||
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a"). | ||
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. | ||
// https://www.postgresql.org/docs/14/datatype-binary.html | ||
return JDBCType.VARCHAR; | ||
} | ||
|
||
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); | ||
case "bytea" -> JDBCType.VARCHAR; | ||
case "timestamptz" -> JDBCType.TIMESTAMP_WITH_TIMEZONE; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strings There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
done |
||
case "timetz" -> JDBCType.TIME_WITH_TIMEZONE; | ||
default -> JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); | ||
}; | ||
} catch (final IllegalArgumentException ex) { | ||
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.", | ||
field.get(INTERNAL_COLUMN_NAME), | ||
|
@@ -136,6 +170,12 @@ public JsonSchemaType getJsonType(JDBCType jdbcType) { | |
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER; | ||
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaType.STRING_BASE_64; | ||
case ARRAY -> JsonSchemaType.ARRAY; | ||
case DATE -> JsonSchemaType.STRING_DATE; | ||
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE; | ||
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE; | ||
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE; | ||
case TIMESTAMP_WITH_TIMEZONE -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE; | ||
|
||
default -> JsonSchemaType.STRING; | ||
}; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this change in expected_spec.json is related to this PR. How did it end up here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Postgres Source Strict encrypt tests were broken in master branch, so i decided to fix them in scope of this PR