From 198ad45219a7a7cd7913f485ed81e2d6c1285541 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 12:45:53 +0300 Subject: [PATCH 01/12] Postgres Source: add timezone awareness and handle BC dates --- ...bstractJdbcCompatibleSourceOperations.java | 27 +++++++++ .../postgres/PostgresSourceOperations.java | 57 +++++++++++++++---- .../sources/PostgresSourceDatatypeTest.java | 23 ++++---- .../protocol/models/JsonSchemaType.java | 22 ++++++- 4 files changed, 104 insertions(+), 25 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index fefa0db06a58c..cd440a6765540 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -20,6 +20,10 @@ import java.sql.Timestamp; import java.text.ParseException; import java.time.Instant; +import java.time.LocalDate; +import java.time.OffsetDateTime; +import java.time.OffsetTime; +import java.time.chrono.IsoEra; import java.util.Collections; import java.util.List; import java.util.StringJoiner; @@ -246,4 +250,27 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection, return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; } + protected DateTime getDateTimeObject (ResultSet resultSet, int index, Class clazz) throws SQLException { + return resultSet.getObject(index, clazz); + } + + protected void putTimeWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + OffsetTime timetz = getDateTimeObject(resultSet, index, OffsetTime.class); + node.put(columnName, timetz.toString()); + } + + protected void putTimestampWithTimezone(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + OffsetDateTime timestamptz = getDateTimeObject(resultSet, index, OffsetDateTime.class); + LocalDate localDate = timestamptz.toLocalDate(); + node.put(columnName, resolveEra(localDate, timestamptz.toString())); + } + + protected String resolveEra(LocalDate date, String value) { + return isBCE(date) ? value.substring(1) + " BC": value; + } + + private static boolean isBCE(LocalDate date) { + return date.getEra().equals(IsoEra.BCE); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index ff0664c0bdae8..04f62223dedc6 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -4,12 +4,6 @@ package io.airbyte.integrations.source.postgres; -import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME; -import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE; -import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME; -import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME; -import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME; - import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; @@ -17,15 +11,20 @@ import io.airbyte.db.DataTypeUtils; import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; +import org.postgresql.jdbc.PgResultSetMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigDecimal; import java.sql.JDBCType; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.time.*; +import java.time.chrono.IsoEra; import java.util.Collections; -import org.postgresql.jdbc.PgResultSetMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.airbyte.db.jdbc.JdbcConstants.*; public class PostgresSourceOperations extends JdbcSourceOperations { @@ -80,9 +79,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob putBoolean(json, columnName, resultSet, colIndex); } else if (columnTypeName.equalsIgnoreCase("bytea")) { putString(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("time") || columnTypeName.equalsIgnoreCase("timetz")) { - putString(json, columnName, resultSet, colIndex); - } else { + } else if (columnTypeName.equalsIgnoreCase("time")) { + putTime(json, columnName, resultSet, colIndex); + } else if (columnTypeName.equalsIgnoreCase("timetz")){ + putTimeWithTimezone(json,columnName,resultSet,colIndex); + } else if(columnTypeName.equalsIgnoreCase("timestamptz")){ + putTimestampWithTimezone(json,columnName,resultSet,colIndex); + } + else { // https://www.postgresql.org/docs/14/datatype.html switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -104,6 +108,25 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } } + @Override + protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); + node.put(columnName, resolveEra(date, date.toString())); + } + + @Override + protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); + node.put(columnName, time.toString()); + } + + @Override + protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { + LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + LocalDate date = timestamp.toLocalDate(); + node.put(columnName, resolveEra(date, timestamp.toString())); + } + @Override public JDBCType getFieldType(final JsonNode field) { try { @@ -116,6 +139,10 @@ public JDBCType getFieldType(final JsonNode field) { // It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. // https://www.postgresql.org/docs/14/datatype-binary.html return JDBCType.VARCHAR; + } else if(typeName.equalsIgnoreCase("timestamptz")){ + return JDBCType.TIMESTAMP_WITH_TIMEZONE; + }else if (typeName.equalsIgnoreCase("timetz")){ + return JDBCType.TIME_WITH_TIMEZONE; } return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); @@ -136,6 +163,12 @@ public JsonSchemaType getJsonType(JDBCType jdbcType) { case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER; case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaType.STRING_BASE_64; case ARRAY -> JsonSchemaType.ARRAY; + case DATE -> JsonSchemaType.STRING_DATE; + case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE; + case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE; + case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE; + case TIMESTAMP_WITH_TIMEZONE -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE; + default -> JsonSchemaType.STRING; }; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index 8824454e740b2..1c590ac8c1777 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -15,6 +15,7 @@ import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.SQLException; +import java.util.Objects; import java.util.Set; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -235,14 +236,12 @@ protected void initTests() { .addExpectedValues("<(5,7),10>", "<(0,0),0>", "<(-10,-4),10>", null) .build()); - // DataTypeUtils#DATE_FORMAT is set as "yyyy-MM-dd'T'HH:mm:ss'Z'", so currently the Postgres source - // returns a date value as a datetime. It cannot handle BC dates (e.g. 199-10-10 BC). addDataTypeTestData( TestDataHolder.builder() .sourceType("date") - .airbyteType(JsonSchemaType.STRING) - .addInsertValues("'1999-01-08'", "null") - .addExpectedValues("1999-01-08T00:00:00Z", null) + .airbyteType(JsonSchemaType.STRING_DATE) + .addInsertValues("'1999-01-08'","'1991-02-10 BC'", "null") + .addExpectedValues("1999-01-08","1990-02-10 BC", null) .build()); for (final String type : Set.of("double precision", "float", "float8")) { @@ -448,7 +447,7 @@ protected void initTests() { TestDataHolder.builder() .sourceType("time") .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE) // time column will ignore time zone .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05Z+8'", "'13:00:06Z-8'") .addExpectedValues(null, "13:00:01", "13:00:02", "13:00:03", "13:00:04", "13:00:05", "13:00:06") @@ -461,11 +460,11 @@ protected void initTests() { TestDataHolder.builder() .sourceType("timetz") .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_TIME_WITH_TIMEZONE) .addInsertValues("null", "'13:00:01'", "'13:00:02+8'", "'13:00:03-8'", "'13:00:04Z'", "'13:00:05Z+8'", "'13:00:06Z-8'") // A time value without time zone will use the time zone set on the database, which is Z-7, // so 13:00:01 is returned as 13:00:01-07. - .addExpectedValues(null, "13:00:01-07", "13:00:02+08", "13:00:03-08", "13:00:04+00", "13:00:05-08", "13:00:06+08") + .addExpectedValues(null, "13:00:01-07:00", "13:00:02+08:00", "13:00:03-08:00", "13:00:04Z", "13:00:05-08:00", "13:00:06+08:00") .build()); } @@ -475,9 +474,9 @@ protected void initTests() { TestDataHolder.builder() .sourceType("timestamp") .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE) .addInsertValues("TIMESTAMP '2004-10-19 10:23:54'", "TIMESTAMP '2004-10-19 10:23:54.123456'", "null") - .addExpectedValues("2004-10-19T10:23:54.000000Z", "2004-10-19T10:23:54.123456Z", null) + .addExpectedValues("2004-10-19T10:23:54", "2004-10-19T10:23:54.123456", null) .build()); } @@ -487,10 +486,10 @@ protected void initTests() { TestDataHolder.builder() .sourceType("timestamptz") .fullSourceDataType(fullSourceType) - .airbyteType(JsonSchemaType.STRING) + .airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE) .addInsertValues("TIMESTAMP '2004-10-19 10:23:54-08'", "TIMESTAMP '2004-10-19 10:23:54.123456-08'", "null") // 2004-10-19T10:23:54Z-8 = 2004-10-19T17:23:54Z - .addExpectedValues("2004-10-19T17:23:54.000000Z", "2004-10-19T17:23:54.123456Z", null) + .addExpectedValues("2004-10-19T17:23:54Z", "2004-10-19T17:23:54.123456Z", null) .build()); } diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java index 9d063088cf5a9..70aefb4b9fd21 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java @@ -15,8 +15,10 @@ public class JsonSchemaType { public static final String DATE_TIME = "date-time"; public static final String DATE = "date"; public static final String TIME = "time"; + public static final String TIME_WITHOUT_TIMEZONE = "time_without_timezone"; + public static final String TIME_WITH_TIMEZONE = "time_with_timezone"; public static final String TIMESTAMP_WITH_TIMEZONE = "timestamp_with_timezone"; - public static final String TIMESTAMP_WITHOUT_TIMEZONE = "timestamp"; + public static final String TIMESTAMP_WITHOUT_TIMEZONE = "timestamp_without_timezone"; public static final String CONTENT_ENCODING = "contentEncoding"; public static final String BASE_64 = "base64"; public static final String AIRBYTE_TYPE = "airbyte_type"; @@ -28,6 +30,24 @@ public class JsonSchemaType { public static final JsonSchemaType ARRAY = JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY).build(); public static final JsonSchemaType NULL = JsonSchemaType.builder(JsonSchemaPrimitive.NULL).build(); public static final JsonSchemaType STRING_BASE_64 = JsonSchemaType.builder(JsonSchemaPrimitive.STRING).withContentEncoding(BASE_64).build(); + public static final JsonSchemaType STRING_TIME_WITH_TIMEZONE = + JsonSchemaType.builder(JsonSchemaPrimitive.STRING) + .withFormat(TIME) + .withAirbyteType(TIME_WITH_TIMEZONE).build(); + public static final JsonSchemaType STRING_TIME_WITHOUT_TIMEZONE = + JsonSchemaType.builder(JsonSchemaPrimitive.STRING) + .withFormat(TIME) + .withAirbyteType(TIME_WITHOUT_TIMEZONE).build(); + public static final JsonSchemaType STRING_TIMESTAMP_WITH_TIMEZONE = + JsonSchemaType.builder(JsonSchemaPrimitive.STRING) + .withFormat(DATE_TIME) + .withAirbyteType(TIMESTAMP_WITH_TIMEZONE).build(); + public static final JsonSchemaType STRING_TIMESTAMP_WITHOUT_TIMEZONE = + JsonSchemaType.builder(JsonSchemaPrimitive.STRING) + .withFormat(DATE_TIME) + .withAirbyteType(TIMESTAMP_WITHOUT_TIMEZONE).build(); + public static final JsonSchemaType STRING_DATE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) + .withFormat(DATE).build(); private final Map jsonSchemaTypeMap; From d836b51aaba77b3a81ffe07210672e6d3c29f5a5 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 12:59:59 +0300 Subject: [PATCH 02/12] fixed checkstyle --- ...bstractJdbcCompatibleSourceOperations.java | 4 +-- .../postgres/PostgresSourceOperations.java | 35 ++++++++++--------- .../sources/PostgresSourceDatatypeTest.java | 5 ++- .../protocol/models/JsonSchemaType.java | 18 +++++----- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index cd440a6765540..c5e10c8e4f4c9 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -250,7 +250,7 @@ public String getFullyQualifiedTableNameWithQuoting(final Connection connection, return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; } - protected DateTime getDateTimeObject (ResultSet resultSet, int index, Class clazz) throws SQLException { + protected DateTime getDateTimeObject(ResultSet resultSet, int index, Class clazz) throws SQLException { return resultSet.getObject(index, clazz); } @@ -266,7 +266,7 @@ protected void putTimestampWithTimezone(ObjectNode node, String columnName, Resu } protected String resolveEra(LocalDate date, String value) { - return isBCE(date) ? value.substring(1) + " BC": value; + return isBCE(date) ? value.substring(1) + " BC" : value; } private static boolean isBCE(LocalDate date) { diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 04f62223dedc6..af9019821e276 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -4,6 +4,12 @@ package io.airbyte.integrations.source.postgres; +import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME; +import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE; +import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE_NAME; +import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_SCHEMA_NAME; +import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_TABLE_NAME; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; @@ -11,20 +17,18 @@ import io.airbyte.db.DataTypeUtils; import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; -import org.postgresql.jdbc.PgResultSetMetaData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.math.BigDecimal; import java.sql.JDBCType; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; -import java.time.*; -import java.time.chrono.IsoEra; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; import java.util.Collections; - -import static io.airbyte.db.jdbc.JdbcConstants.*; +import org.postgresql.jdbc.PgResultSetMetaData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PostgresSourceOperations extends JdbcSourceOperations { @@ -81,12 +85,11 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob putString(json, columnName, resultSet, colIndex); } else if (columnTypeName.equalsIgnoreCase("time")) { putTime(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("timetz")){ - putTimeWithTimezone(json,columnName,resultSet,colIndex); - } else if(columnTypeName.equalsIgnoreCase("timestamptz")){ - putTimestampWithTimezone(json,columnName,resultSet,colIndex); - } - else { + } else if (columnTypeName.equalsIgnoreCase("timetz")) { + putTimeWithTimezone(json, columnName, resultSet, colIndex); + } else if (columnTypeName.equalsIgnoreCase("timestamptz")) { + putTimestampWithTimezone(json, columnName, resultSet, colIndex); + } else { // https://www.postgresql.org/docs/14/datatype.html switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -139,9 +142,9 @@ public JDBCType getFieldType(final JsonNode field) { // It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. // https://www.postgresql.org/docs/14/datatype-binary.html return JDBCType.VARCHAR; - } else if(typeName.equalsIgnoreCase("timestamptz")){ + } else if (typeName.equalsIgnoreCase("timestamptz")) { return JDBCType.TIMESTAMP_WITH_TIMEZONE; - }else if (typeName.equalsIgnoreCase("timetz")){ + } else if (typeName.equalsIgnoreCase("timetz")) { return JDBCType.TIME_WITH_TIMEZONE; } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java index 1c590ac8c1777..e67a9e5cf7384 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceDatatypeTest.java @@ -15,7 +15,6 @@ import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.JsonSchemaType; import java.sql.SQLException; -import java.util.Objects; import java.util.Set; import org.jooq.DSLContext; import org.jooq.SQLDialect; @@ -240,8 +239,8 @@ protected void initTests() { TestDataHolder.builder() .sourceType("date") .airbyteType(JsonSchemaType.STRING_DATE) - .addInsertValues("'1999-01-08'","'1991-02-10 BC'", "null") - .addExpectedValues("1999-01-08","1990-02-10 BC", null) + .addInsertValues("'1999-01-08'", "'1991-02-10 BC'", "null") + .addExpectedValues("1999-01-08", "1990-02-10 BC", null) .build()); for (final String type : Set.of("double precision", "float", "float8")) { diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java index 70aefb4b9fd21..17a021817f393 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java @@ -32,22 +32,22 @@ public class JsonSchemaType { public static final JsonSchemaType STRING_BASE_64 = JsonSchemaType.builder(JsonSchemaPrimitive.STRING).withContentEncoding(BASE_64).build(); public static final JsonSchemaType STRING_TIME_WITH_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withFormat(TIME) - .withAirbyteType(TIME_WITH_TIMEZONE).build(); + .withFormat(TIME) + .withAirbyteType(TIME_WITH_TIMEZONE).build(); public static final JsonSchemaType STRING_TIME_WITHOUT_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withFormat(TIME) - .withAirbyteType(TIME_WITHOUT_TIMEZONE).build(); + .withFormat(TIME) + .withAirbyteType(TIME_WITHOUT_TIMEZONE).build(); public static final JsonSchemaType STRING_TIMESTAMP_WITH_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withFormat(DATE_TIME) - .withAirbyteType(TIMESTAMP_WITH_TIMEZONE).build(); + .withFormat(DATE_TIME) + .withAirbyteType(TIMESTAMP_WITH_TIMEZONE).build(); public static final JsonSchemaType STRING_TIMESTAMP_WITHOUT_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withFormat(DATE_TIME) - .withAirbyteType(TIMESTAMP_WITHOUT_TIMEZONE).build(); + .withFormat(DATE_TIME) + .withAirbyteType(TIMESTAMP_WITHOUT_TIMEZONE).build(); public static final JsonSchemaType STRING_DATE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withFormat(DATE).build(); + .withFormat(DATE).build(); private final Map jsonSchemaTypeMap; From 4018d8af9211d2838dc7195c9310c3b65aa77b66 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 16:24:00 +0300 Subject: [PATCH 03/12] add tests --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 21 +++- .../postgres/PostgresSourceOperations.java | 15 ++- .../PostgresJdbcSourceAcceptanceTest.java | 105 +++++++++++++++++- 3 files changed, 130 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index c8d3a294dd703..e0f27810d264b 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -557,6 +557,10 @@ void testIncrementalStringCheckCursorSpaceInColumnName() throws Exception { @Test void testIncrementalTimestampCheckCursor() throws Exception { + incrementalTimestampCheck(); + } + + protected void incrementalTimestampCheck() throws Exception { incrementalCursorCheck( COL_UPDATED_AT, "2005-10-18T00:00:00Z", @@ -611,6 +615,16 @@ void testReadOneTableIncrementallyTwice() throws Exception { assertEquals(2, (int) actualMessagesSecondSync.stream().filter(r -> r.getType() == Type.RECORD).count()); + final List expectedMessages = getExpectedAirbyteMessagesSecondSync(namespace); + + setEmittedAtToNull(actualMessagesSecondSync); + + assertTrue(expectedMessages.size() == actualMessagesSecondSync.size()); + assertTrue(expectedMessages.containsAll(actualMessagesSecondSync)); + assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)); + } + + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { final List expectedMessages = new ArrayList<>(); expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) @@ -634,12 +648,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { .withStreamNamespace(namespace) .withCursorField(ImmutableList.of(COL_ID)) .withCursor("5"))))))); - - setEmittedAtToNull(actualMessagesSecondSync); - - assertTrue(expectedMessages.size() == actualMessagesSecondSync.size()); - assertTrue(expectedMessages.containsAll(actualMessagesSecondSync)); - assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)); + return expectedMessages; } @Test diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index af9019821e276..1ba7e645fed9d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -18,10 +18,7 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.sql.JDBCType; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; +import java.sql.*; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -70,6 +67,16 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { return jsonNode; } + @Override + protected void setDate(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + Date date = Date.valueOf(value); + preparedStatement.setDate(parameterIndex, date); + } catch (final Exception e) { + throw new RuntimeException(e); + } + } + @Override public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index cd4a2e124ccc9..93c2d01023c32 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -7,16 +7,23 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.integrations.source.relationaldb.models.DbState; +import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.*; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -91,4 +98,100 @@ void testSpec() throws Exception { assertEquals(expected, actual); } + @Override + protected List getTestMessages() { + return Lists.newArrayList( + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_1, + COL_NAME, "picard", + COL_UPDATED_AT, "2004-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_2, + COL_NAME, "crusher", + COL_UPDATED_AT, + "2005-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_3, + COL_NAME, "vash", + COL_UPDATED_AT, "2006-10-19"))))); + } + + @Override + protected AirbyteCatalog getCatalog(final String defaultNamespace) { + return new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_WITHOUT_PK, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(Collections.emptyList()), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_COMPOSITE_PK, + defaultNamespace, + Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), + Field.of(COL_LAST_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey( + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + } + + @Override + protected void incrementalTimestampCheck() throws Exception { + super.incrementalCursorCheck(COL_UPDATED_AT, + "2005-10-18", + "2006-10-19", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Override + protected JdbcSourceOperations getSourceOperations() { + return new PostgresSourceOperations(); + } + + @Override + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { + final List expectedMessages = new ArrayList<>(); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_4, + COL_NAME, "riker", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_5, + COL_NAME, "data", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage() + .withType(AirbyteMessage.Type.STATE) + .withState(new AirbyteStateMessage() + .withData(Jsons.jsonNode(new DbState() + .withCdc(false) + .withStreams(Lists.newArrayList(new DbStreamState() + .withStreamName(streamName) + .withStreamNamespace(namespace) + .withCursorField(ImmutableList.of(COL_ID)) + .withCursor("5"))))))); + return expectedMessages; + } + } From 72f4ce78db50c511e914a913362a894e14af2bb1 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 16:27:05 +0300 Subject: [PATCH 04/12] updated changelog --- docs/integrations/sources/postgres.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 22a4fc308789f..ae6f93e71eec7 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -270,6 +270,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.17 | 2022-05-25 | [13166](https://github.com/airbytehq/airbyte/pull/13166) | Added timezone awareness and handle BC dates | | 0.4.16 | 2022-05-14 | [12840](https://github.com/airbytehq/airbyte/pull/12840) | Added custom JDBC parameters field | | 0.4.15 | 2022-05-13 | [12834](https://github.com/airbytehq/airbyte/pull/12834) | Fix the bug that the connector returns empty catalog for Azure Postgres database | | 0.4.14 | 2022-05-08 | [12689](https://github.com/airbytehq/airbyte/pull/12689) | Add table retrieval according to role-based `SELECT` privilege | From 662c4a322e144fc371a528592bf4ab324065f2c2 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 16:28:57 +0300 Subject: [PATCH 05/12] removed star import --- .../source/postgres/PostgresSourceOperations.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 1ba7e645fed9d..a05ac30237f22 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -18,7 +18,12 @@ import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.protocol.models.JsonSchemaType; import java.math.BigDecimal; -import java.sql.*; +import java.sql.Date; +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; From 6dbcc1d3955d144011498917175cac02956c432a Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Wed, 25 May 2022 18:04:22 +0300 Subject: [PATCH 06/12] fixed tests --- ...StrictEncryptJdbcSourceAcceptanceTest.java | 106 +++++++++++++++++- .../src/test/resources/expected_spec.json | 8 +- 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index c9e431613f00d..8724cb008dd43 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -7,17 +7,25 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; +import io.airbyte.db.jdbc.JdbcSourceOperations; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.source.jdbc.JdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.integrations.source.relationaldb.models.DbState; +import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.*; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.function.Function; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -93,6 +101,102 @@ static void cleanUp() { PSQL_DB.close(); } + @Override + protected List getTestMessages() { + return Lists.newArrayList( + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_1, + COL_NAME, "picard", + COL_UPDATED_AT, "2004-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_2, + COL_NAME, "crusher", + COL_UPDATED_AT, + "2005-10-19")))), + new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(getDefaultNamespace()) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_3, + COL_NAME, "vash", + COL_UPDATED_AT, "2006-10-19"))))); + } + + @Override + protected AirbyteCatalog getCatalog(final String defaultNamespace) { + return new AirbyteCatalog().withStreams(Lists.newArrayList( + CatalogHelpers.createAirbyteStream( + TABLE_NAME, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_WITHOUT_PK, + defaultNamespace, + Field.of(COL_ID, JsonSchemaType.NUMBER), + Field.of(COL_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(Collections.emptyList()), + CatalogHelpers.createAirbyteStream( + TABLE_NAME_COMPOSITE_PK, + defaultNamespace, + Field.of(COL_FIRST_NAME, JsonSchemaType.STRING), + Field.of(COL_LAST_NAME, JsonSchemaType.STRING), + Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey( + List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME))))); + } + + @Override + protected void incrementalTimestampCheck() throws Exception { + super.incrementalCursorCheck(COL_UPDATED_AT, + "2005-10-18", + "2006-10-19", + Lists.newArrayList(getTestMessages().get(1), + getTestMessages().get(2))); + } + + @Override + protected JdbcSourceOperations getSourceOperations() { + return new PostgresSourceOperations(); + } + + @Override + protected List getExpectedAirbyteMessagesSecondSync(String namespace) { + final List expectedMessages = new ArrayList<>(); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_4, + COL_NAME, "riker", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage().withType(AirbyteMessage.Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) + .withData(Jsons.jsonNode(ImmutableMap + .of(COL_ID, ID_VALUE_5, + COL_NAME, "data", + COL_UPDATED_AT, "2006-10-19"))))); + expectedMessages.add(new AirbyteMessage() + .withType(AirbyteMessage.Type.STATE) + .withState(new AirbyteStateMessage() + .withData(Jsons.jsonNode(new DbState() + .withCdc(false) + .withStreams(Lists.newArrayList(new DbStreamState() + .withStreamName(streamName) + .withStreamNamespace(namespace) + .withCursorField(ImmutableList.of(COL_ID)) + .withCursor("5"))))))); + return expectedMessages; + } + @Test void testSpec() throws Exception { final ConnectorSpecification actual = source.spec(); diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json index 19d9695776fa7..ed3b43b3a7cc1 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json @@ -54,11 +54,17 @@ "airbyte_secret": true, "order": 5 }, + "jdbc_url_params": { + "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", + "title": "JDBC URL Params", + "type": "string", + "order": 6 + }, "replication_method": { "type": "object", "title": "Replication Method", "description": "Replication method to use for extracting data from the database.", - "order": 7, + "order": 8, "oneOf": [ { "title": "Standard", From abf903fb3588245b56fa24cf8a4e3cefc7c1a41f Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 27 May 2022 13:40:04 +0300 Subject: [PATCH 07/12] refactoring --- ...bstractJdbcCompatibleSourceOperations.java | 2 +- ...StrictEncryptJdbcSourceAcceptanceTest.java | 10 ++- .../postgres/PostgresSourceOperations.java | 74 +++++++++---------- 3 files changed, 43 insertions(+), 43 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index c5e10c8e4f4c9..a0639842c095e 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -269,7 +269,7 @@ protected String resolveEra(LocalDate date, String value) { return isBCE(date) ? value.substring(1) + " BC" : value; } - private static boolean isBCE(LocalDate date) { + public static boolean isBCE(LocalDate date) { return date.getEra().equals(IsoEra.BCE); } diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index 8724cb008dd43..baa1006c82b80 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -21,7 +21,15 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.*; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.ArrayList; import java.util.Collections; diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index a05ac30237f22..a06fb350b4d2f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -86,39 +86,35 @@ protected void setDate(final PreparedStatement preparedStatement, final int para public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); final String columnName = metadata.getColumnName(colIndex); - final String columnTypeName = metadata.getColumnTypeName(colIndex); + final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase(); final JDBCType columnType = safeGetJdbcType(metadata.getColumnType(colIndex)); - if (resultSet.getString(colIndex) == null) { json.putNull(columnName); - } else if (columnTypeName.equalsIgnoreCase("bool") || columnTypeName.equalsIgnoreCase("boolean")) { - putBoolean(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("bytea")) { - putString(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("time")) { - putTime(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("timetz")) { - putTimeWithTimezone(json, columnName, resultSet, colIndex); - } else if (columnTypeName.equalsIgnoreCase("timestamptz")) { - putTimestampWithTimezone(json, columnName, resultSet, colIndex); } else { - // https://www.postgresql.org/docs/14/datatype.html - switch (columnType) { - case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); - case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); - case INTEGER -> putInteger(json, columnName, resultSet, colIndex); - case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); - case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex); - case REAL -> putFloat(json, columnName, resultSet, colIndex); - case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); - // BIT is a bit string in Postgres, e.g. '0100' - case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); - case DATE -> putDate(json, columnName, resultSet, colIndex); - case TIME -> putTime(json, columnName, resultSet, colIndex); - case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); - case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); - case ARRAY -> putArray(json, columnName, resultSet, colIndex); - default -> putDefault(json, columnName, resultSet, colIndex); + switch (columnTypeName) { + case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex); + case "bytea" -> putString(json, columnName, resultSet, colIndex); + case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); + case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + default -> { + switch (columnType) { + case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); + case TINYINT, SMALLINT -> putShortInt(json, columnName, resultSet, colIndex); + case INTEGER -> putInteger(json, columnName, resultSet, colIndex); + case BIGINT -> putBigInt(json, columnName, resultSet, colIndex); + case FLOAT, DOUBLE -> putDouble(json, columnName, resultSet, colIndex); + case REAL -> putFloat(json, columnName, resultSet, colIndex); + case NUMERIC, DECIMAL -> putBigDecimal(json, columnName, resultSet, colIndex); + // BIT is a bit string in Postgres, e.g. '0100' + case BIT, CHAR, VARCHAR, LONGVARCHAR -> putString(json, columnName, resultSet, colIndex); + case DATE -> putDate(json, columnName, resultSet, colIndex); + case TIME -> putTime(json, columnName, resultSet, colIndex); + case TIMESTAMP -> putTimestamp(json, columnName, resultSet, colIndex); + case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(json, columnName, resultSet, colIndex); + case ARRAY -> putArray(json, columnName, resultSet, colIndex); + default -> putDefault(json, columnName, resultSet, colIndex); + } + } } } } @@ -145,22 +141,18 @@ protected void putTimestamp(ObjectNode node, String columnName, ResultSet result @Override public JDBCType getFieldType(final JsonNode field) { try { - final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText(); + final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase(); // Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN - if (typeName.equalsIgnoreCase("bool") || typeName.equalsIgnoreCase("boolean")) { - return JDBCType.BOOLEAN; - } else if (typeName.equalsIgnoreCase("bytea")) { + return switch (typeName) { + case "bool", "boolean" -> JDBCType.BOOLEAN; // BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a"). // It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. // https://www.postgresql.org/docs/14/datatype-binary.html - return JDBCType.VARCHAR; - } else if (typeName.equalsIgnoreCase("timestamptz")) { - return JDBCType.TIMESTAMP_WITH_TIMEZONE; - } else if (typeName.equalsIgnoreCase("timetz")) { - return JDBCType.TIME_WITH_TIMEZONE; - } - - return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); + case "bytea" -> JDBCType.BOOLEAN; + case "timestamptz" -> JDBCType.TIMESTAMP_WITH_TIMEZONE; + case "timetz" -> JDBCType.TIME_WITH_TIMEZONE; + default -> JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); + }; } catch (final IllegalArgumentException ex) { LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.", field.get(INTERNAL_COLUMN_NAME), From 8e17ade7ed2130292b93290743fecad978363e3c Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 27 May 2022 13:44:05 +0300 Subject: [PATCH 08/12] removed star import --- .../postgres/PostgresJdbcSourceAcceptanceTest.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 93c2d01023c32..65b53be9a905e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -19,7 +19,15 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; -import io.airbyte.protocol.models.*; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteRecordMessage; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaType; +import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.JDBCType; import java.util.ArrayList; From ac68e40875691225296e20b76475137879025397 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 27 May 2022 13:48:12 +0300 Subject: [PATCH 09/12] fixed bytea type --- .../integrations/source/postgres/PostgresSourceOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index a06fb350b4d2f..aed6555f315ff 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -148,7 +148,7 @@ public JDBCType getFieldType(final JsonNode field) { // BYTEA is variable length binary string with hex output format by default (e.g. "\x6b707a"). // It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. // https://www.postgresql.org/docs/14/datatype-binary.html - case "bytea" -> JDBCType.BOOLEAN; + case "bytea" -> JDBCType.VARCHAR; case "timestamptz" -> JDBCType.TIMESTAMP_WITH_TIMEZONE; case "timetz" -> JDBCType.TIME_WITH_TIMEZONE; default -> JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); From fc9f986658941b6140385d2c0e7e20f59294d785 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Fri, 27 May 2022 22:00:53 +0300 Subject: [PATCH 10/12] created final static constants --- .../source/postgres/PostgresSourceOperations.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index aed6555f315ff..cfb73107ac4bd 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -35,6 +35,8 @@ public class PostgresSourceOperations extends JdbcSourceOperations { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceOperations.class); + private static final String TIMESTAMPTZ = "timestamptz"; + private static final String TIMETZ = "timetz"; @Override public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { @@ -94,8 +96,8 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob switch (columnTypeName) { case "bool", "boolean" -> putBoolean(json, columnName, resultSet, colIndex); case "bytea" -> putString(json, columnName, resultSet, colIndex); - case "timetz" -> putTimeWithTimezone(json, columnName, resultSet, colIndex); - case "timestamptz" -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); + case TIMETZ -> putTimeWithTimezone(json, columnName, resultSet, colIndex); + case TIMESTAMPTZ -> putTimestampWithTimezone(json, columnName, resultSet, colIndex); default -> { switch (columnType) { case BOOLEAN -> putBoolean(json, columnName, resultSet, colIndex); @@ -149,8 +151,8 @@ public JDBCType getFieldType(final JsonNode field) { // It should not be converted to base64 binary string. So it is represented as JDBC VARCHAR. // https://www.postgresql.org/docs/14/datatype-binary.html case "bytea" -> JDBCType.VARCHAR; - case "timestamptz" -> JDBCType.TIMESTAMP_WITH_TIMEZONE; - case "timetz" -> JDBCType.TIME_WITH_TIMEZONE; + case TIMESTAMPTZ -> JDBCType.TIMESTAMP_WITH_TIMEZONE; + case TIMETZ -> JDBCType.TIME_WITH_TIMEZONE; default -> JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); }; } catch (final IllegalArgumentException ex) { From f80ff755d1effb1df06d5ba025fc23263de988b7 Mon Sep 17 00:00:00 2001 From: vmaltsev Date: Sun, 29 May 2022 10:59:30 +0300 Subject: [PATCH 11/12] bump version --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- airbyte-integrations/connectors/source-postgres/Dockerfile | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index a0250be06dc3f..12eb02337ca34 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.15 +LABEL io.airbyte.version=0.4.16 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index cf512396b2794..8822f74f19c32 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.18 +LABEL io.airbyte.version=0.4.19 LABEL io.airbyte.name=airbyte/source-postgres From 359f0c6cd4aab465a283fc6cf0394a0ca60decd3 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Wed, 1 Jun 2022 01:50:51 +0000 Subject: [PATCH 12/12] auto-bump connector version --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 8c9ac446e5502..e0178b5166c12 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -715,7 +715,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.18 + dockerImageTag: 0.4.19 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index fbccfd14538cf..2dd1e8e3db612 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6700,7 +6700,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.18" +- dockerImage: "airbyte/source-postgres:0.4.19" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: