Skip to content

Postgres Source: add timezone awareness and handle BC dates #13166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import java.sql.Timestamp;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
Expand Down Expand Up @@ -246,4 +250,27 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection,
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <DateTime> DateTime getDateTimeObject(ResultSet resultSet, int index, Class<DateTime> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}

protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class);
node.put(columnName, timetz.toString());
}

protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class);
LocalDate localDate = timestamptz.toLocalDate();
node.put(columnName, resolveEra(localDate, timestamptz.toString()));
}

protected String resolveEra(LocalDate date, String value) {
return isBCE(date) ? value.substring(1) + " BC" : value;
}

public static boolean isBCE(LocalDate date) {
return date.getEra().equals(IsoEra.BCE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception {

@Test
void testIncrementalTimestampCheckCursor() throws Exception {
incrementalTimestampCheck();
}

protected void incrementalTimestampCheck() throws Exception {
incrementalCursorCheck(
COL_UPDATED_AT,
"2005-10-18T00:00:00Z",
Expand Down Expand Up @@ -611,6 +615,16 @@ void testReadOneTableIncrementallyTwice() throws Exception {

assertEquals(2,
(int) actualMessagesSecondSync.stream().filter(r -> r.getType() == Type.RECORD).count());
final List<AirbyteMessage> expectedMessages = getExpectedAirbyteMessagesSecondSync(namespace);

setEmittedAtToNull(actualMessagesSecondSync);

assertTrue(expectedMessages.size() == actualMessagesSecondSync.size());
assertTrue(expectedMessages.containsAll(actualMessagesSecondSync));
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
}

protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
Expand All @@ -634,12 +648,7 @@ void testReadOneTableIncrementallyTwice() throws Exception {
.withStreamNamespace(namespace)
.withCursorField(ImmutableList.of(COL_ID))
.withCursor("5")))))));

setEmittedAtToNull(actualMessagesSecondSync);

assertTrue(expectedMessages.size() == actualMessagesSecondSync.size());
assertTrue(expectedMessages.containsAll(actualMessagesSecondSync));
assertTrue(actualMessagesSecondSync.containsAll(expectedMessages));
return expectedMessages;
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,33 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshHelpers;
import io.airbyte.integrations.source.jdbc.JdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
Expand Down Expand Up @@ -93,6 +109,102 @@ static void cleanUp() {
PSQL_DB.close();
}

@Override
protected List<AirbyteMessage> getTestMessages() {
return Lists.newArrayList(
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_1,
COL_NAME, "picard",
COL_UPDATED_AT, "2004-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_2,
COL_NAME, "crusher",
COL_UPDATED_AT,
"2005-10-19")))),
new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace())
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_3,
COL_NAME, "vash",
COL_UPDATED_AT, "2006-10-19")))));
}

@Override
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

@Override
protected void incrementalTimestampCheck() throws Exception {
super.incrementalCursorCheck(COL_UPDATED_AT,
"2005-10-18",
"2006-10-19",
Lists.newArrayList(getTestMessages().get(1),
getTestMessages().get(2)));
}

@Override
protected JdbcSourceOperations getSourceOperations() {
return new PostgresSourceOperations();
}

@Override
protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(String namespace) {
final List<AirbyteMessage> expectedMessages = new ArrayList<>();
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_4,
COL_NAME, "riker",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace)
.withData(Jsons.jsonNode(ImmutableMap
.of(COL_ID, ID_VALUE_5,
COL_NAME, "data",
COL_UPDATED_AT, "2006-10-19")))));
expectedMessages.add(new AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage()
.withData(Jsons.jsonNode(new DbState()
.withCdc(false)
.withStreams(Lists.newArrayList(new DbStreamState()
.withStreamName(streamName)
.withStreamNamespace(namespace)
.withCursorField(ImmutableList.of(COL_ID))
.withCursor("5")))))));
return expectedMessages;
}

@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = source.spec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,17 @@
"airbyte_secret": true,
"order": 5
},
"jdbc_url_params": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change in expected_spec.json is related to this PR. How did it end up here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change in expected_spec.json is related to this PR. How did it end up here?

Postgres Source Strict encrypt tests were broken in master branch, so i decided to fix them in scope of this PR

"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 6
},
"replication_method": {
"type": "object",
"title": "Replication Method",
"description": "Replication method to use for extracting data from the database.",
"order": 7,
"order": 8,
"oneOf": [
{
"title": "Standard",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,15 @@
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Collections;
import org.postgresql.jdbc.PgResultSetMetaData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -67,58 +72,87 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

@Override
protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException {
try {
Date date = Date.valueOf(value);
preparedStatement.setDate(parameterIndex, date);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData();
final String columnName = metadata.getColumnName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex);
final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase();
final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex));

if (resultSet.getString(colIndex) == null) {
json.putNull(columnName);
} else if (columnTypeName.equalsIgnoreCase("bool") || columnTypeName.equalsIgnoreCase("boolean")) {
putBoolean(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("bytea")) {
putString(json, columnName, resultSet, colIndex);
} else if (columnTypeName.equalsIgnoreCase("time") || columnTypeName.equalsIgnoreCase("timetz")) {
putString(json, columnName, resultSet, colIndex);
} else {
// https://www.postgresql.org/docs/14/datatype.html
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
switch (columnTypeName) {
case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex);
case "bytea" -> putString(json, columnName, resultSet, colIndex);
case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex);
case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex);
default -> {
switch (columnType) {
case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex);
case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex);
case INTEGER -> putInteger(json, columnName, resultSet, colIndex);
case BIGINT -> putBigInt(json, columnName, resultSet, colIndex);
case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex);
case REAL -> putFloat(json, columnName, resultSet, colIndex);
case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex);
// BIT is a bit string in Postgres, e.g. '0100'
case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex);
case DATE -> putDate(json, columnName, resultSet, colIndex);
case TIME -> putTime(json, columnName, resultSet, colIndex);
case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex);
case ARRAY -> putArray(json, columnName, resultSet, colIndex);
default -> putDefault(json, columnName, resultSet, colIndex);
}
}
}
}
}

@Override
protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class);
node.put(columnName, resolveEra(date, date.toString()));
}

@Override
protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class);
node.put(columnName, time.toString());
}

@Override
protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException {
LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
LocalDate date = timestamp.toLocalDate();
node.put(columnName, resolveEra(date, timestamp.toString()));
}

@Override
public JDBCType getFieldType(final JsonNode field) {
try {
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText();
final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase();
// Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN
if (typeName.equalsIgnoreCase("bool") || typeName.equalsIgnoreCase("boolean")) {
return JDBCType.BOOLEAN;
} else if (typeName.equalsIgnoreCase("bytea")) {
return switch (typeName) {
case "bool", "boolean" -> JDBCType.BOOLEAN;
// BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a").
// It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR.
// https://www.postgresql.org/docs/14/datatype-binary.html
return JDBCType.VARCHAR;
}

return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
case "bytea" -> JDBCType.VARCHAR;
case "timestamptz" -> JDBCType.TIMESTAMP_WITH_TIMEZONE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings timestamptz and timetz are used more than once in the codebase - please create public static constants for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings timestamptz and timetz are used more than once in the codebase - please create public static constants for them.

done

case "timetz" -> JDBCType.TIME_WITH_TIMEZONE;
default -> JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
};
} catch (final IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
field.get(INTERNAL_COLUMN_NAME),
Expand All @@ -136,6 +170,12 @@ public JsonSchemaType getJsonType(JDBCType jdbcType) {
case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaType.STRING_BASE_64;
case ARRAY -> JsonSchemaType.ARRAY;
case DATE -> JsonSchemaType.STRING_DATE;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE;
case TIMESTAMP_WITH_TIMEZONE -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE;

default -> JsonSchemaType.STRING;
};
}
Expand Down
Loading