From 253d8b30a21b34fc38676f4f47fdca2b45884a8a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 08:34:33 -0700 Subject: [PATCH 01/19] just use generator.createTable --- .../snowflake/typing_deduping/SnowflakeSqlGenerator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 00aba59b0721a..84fa953f25491 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -514,7 +514,9 @@ public String migrateFromV1toV2(final StreamId streamId, final String namespace, public static String escapeIdentifier(final String identifier) { // Note that we don't need to escape backslashes here! // The only special character in an identifier is the double-quote, which needs to be doubled. - return identifier.replace("\"", "\"\""); + return identifier +// .toUpperCase() + .replace("\"", "\"\""); } public static String escapeSingleQuotedString(final String str) { From 5380a7db184f50a6f29cf7ae8e77cf08237a6f98 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 08:36:16 -0700 Subject: [PATCH 02/19] remove unrelated change --- .../snowflake/typing_deduping/SnowflakeSqlGenerator.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 84fa953f25491..00aba59b0721a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -514,9 +514,7 @@ public String migrateFromV1toV2(final StreamId streamId, final String namespace, public static String escapeIdentifier(final String identifier) { // Note that we don't need to escape backslashes here! // The only special character in an identifier is the double-quote, which needs to be doubled. - return identifier -// .toUpperCase() - .replace("\"", "\"\""); + return identifier.replace("\"", "\"\""); } public static String escapeSingleQuotedString(final String str) { From 3f2d1d4feab06ecc3ff0cbe1c8517e93c15bf867 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 08:44:47 -0700 Subject: [PATCH 03/19] add ability to customize stream id --- .../BaseSqlGeneratorIntegrationTest.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 7f12fa8c4fed8..6f5230161429b 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -118,6 +118,14 @@ public abstract class BaseSqlGeneratorIntegrationTest { protected abstract DestinationHandler getDestinationHandler(); + /** + * Subclasses should override this method if they need to make changes to the stream ID. + * For example, you could upcase the final table name here. + */ + protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) { + return new StreamId(namespace, finalTableName, namespace, rawTableName, namespace, finalTableName); + } + /** * Do any setup work to create a namespace for this test run. For example, this might create a * BigQuery dataset, or a Snowflake schema. @@ -203,7 +211,7 @@ public void setup() throws Exception { // assumptions about StreamId structure. // In practice, the final table would be testDataset.users, and the raw table would be // airbyte_internal.testDataset_raw__stream_users. - streamId = new StreamId(namespace, "users_final", namespace, "users_raw", namespace, "users_final"); + streamId = buildStreamId(namespace, "users_final", "users_raw"); incrementalDedupStream = new StreamConfig( streamId, @@ -791,15 +799,10 @@ public void weirdColumnNames() throws Exception { public void noCrashOnSpecialCharacters(final String specialChars) throws Exception { final String str = namespace + "_" + specialChars; final StreamId originalStreamId = generator.buildStreamId(str, str, "unused"); - final StreamId modifiedStreamId = new StreamId( + final StreamId modifiedStreamId = buildStreamId( originalStreamId.finalNamespace(), originalStreamId.finalName(), - // hack for testing simplicity: put the raw tables in the final namespace. This makes cleanup - // easier. - originalStreamId.finalNamespace(), - "raw_table", - null, - null); + "raw_table"); final ColumnId columnId = generator.buildColumnId(str); try { createNamespace(modifiedStreamId.finalNamespace()); From d5c3f4c46d78f79d7ea3616d253999c3a698d841 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 09:45:32 -0700 Subject: [PATCH 04/19] wip upcase final table stuff --- .../SnowflakeSqlGenerator.java | 56 +++++++++---------- .../snowflake/SnowflakeTestUtils.java | 19 ++++--- .../SnowflakeSqlGeneratorIntegrationTest.java | 11 +++- 3 files changed, 45 insertions(+), 41 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 00aba59b0721a..67bce89696a32 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -37,8 +37,8 @@ public class SnowflakeSqlGenerator implements SqlGenerator dumpRawTable(final JdbcDatabase database, final Str timestampToString(quote(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT)), quote(JavaBaseConstants.COLUMN_NAME_DATA)), database, - tableIdentifier); + tableIdentifier, + // Raw tables still have lowercase names + false); } public static List dumpFinalTable(final JdbcDatabase database, final String databaseName, final String schema, final String table) @@ -41,9 +43,9 @@ public static List dumpFinalTable(final JdbcDatabase database, final S AND table_name = ? ORDER BY ordinal_position; """, - unescapeIdentifier(databaseName), - unescapeIdentifier(schema), - unescapeIdentifier(table)).stream() + unescapeIdentifier(databaseName).toUpperCase(), + unescapeIdentifier(schema).toUpperCase(), + unescapeIdentifier(table).toUpperCase()).stream() .map(column -> { final String quotedName = quote(column.get("COLUMN_NAME").asText()); final String type = column.get("DATA_TYPE").asText(); @@ -59,7 +61,7 @@ public static List dumpFinalTable(final JdbcDatabase database, final S }; }) .toList(); - return dumpTable(columns, database, quote(schema) + "." + quote(table)); + return dumpTable(columns, database, quote(schema) + "." + quote(table), true); } /** @@ -71,12 +73,13 @@ public static List dumpFinalTable(final JdbcDatabase database, final S * * @param tableIdentifier Table identifier (e.g. "schema.table"), with quotes if necessary. */ - public static List dumpTable(final List columns, final JdbcDatabase database, final String tableIdentifier) throws SQLException { + public static List dumpTable(final List columns, final JdbcDatabase database, final String tableIdentifier, final boolean upcaseExtractedAt) throws SQLException { return database.bufferedResultSetQuery(connection -> connection.createStatement().executeQuery(new StringSubstitutor(Map.of( "columns", columns.stream().collect(joining(",")), - "table", tableIdentifier)).replace( + "table", tableIdentifier, + "extracted_at", upcaseExtractedAt ? "_AIRBYTE_EXTRACTED_AT" : "\"_airbyte_extracted_at\"")).replace( """ - SELECT ${columns} FROM ${table} ORDER BY "_airbyte_extracted_at" ASC + SELECT ${columns} FROM ${table} ORDER BY ${extracted_at} ASC """)), new SnowflakeTestSourceOperations()::rowToJson); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 43b61dbef774c..cd724a104f65a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -69,9 +69,14 @@ protected SnowflakeDestinationHandler getDestinationHandler() { return new SnowflakeDestinationHandler(databaseName, database); } + @Override + protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) { + return new StreamId(namespace.toUpperCase(), finalTableName.toUpperCase(), namespace.toUpperCase(), rawTableName, namespace, finalTableName); + } + @Override protected void createNamespace(final String namespace) throws SQLException { - database.execute("CREATE SCHEMA IF NOT EXISTS \"" + namespace + '"'); + database.execute("CREATE SCHEMA IF NOT EXISTS \"" + namespace.toUpperCase() + '"'); } @Override @@ -99,12 +104,12 @@ protected List dumpFinalTableRecords(final StreamId streamId, final St database, databaseName, streamId.finalNamespace(), - streamId.finalName() + suffix); + streamId.finalName() + suffix.toUpperCase()); } @Override protected void teardownNamespace(final String namespace) throws SQLException { - database.execute("DROP SCHEMA IF EXISTS \"" + namespace + '"'); + database.execute("DROP SCHEMA IF EXISTS \"" + namespace.toUpperCase() + '"'); } @Override From 8a17f2679fe78adf8acb47443b72bd8f4b4d0472 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 11:25:13 -0700 Subject: [PATCH 05/19] progress on making tests pass --- .../BaseSqlGeneratorIntegrationTest.java | 8 +++++++- .../BaseTypingDedupingTest.java | 4 +++- .../snowflake/SnowflakeTestUtils.java | 20 ++++++++++++++++++- .../alltypes_expectedrecords_final.jsonl | 10 +++++----- ...crementaldedup_expectedrecords_final.jsonl | 4 ++-- ...mestampformats_expectedrecords_final.jsonl | 20 +++++++++---------- ...irdcolumnnames_expectedrecords_final.jsonl | 4 ++-- 7 files changed, 48 insertions(+), 22 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 6f5230161429b..f609d80a157c1 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -84,7 +84,8 @@ public abstract class BaseSqlGeneratorIntegrationTest { Stream.of("_ab_cdc_deleted_at")).toList(); } - protected static final RecordDiffer DIFFER = new RecordDiffer( + // TODO use columnID to get escaped names + protected RecordDiffer DIFFER = new RecordDiffer( Pair.of("id1", AirbyteProtocolType.INTEGER), Pair.of("id2", AirbyteProtocolType.INTEGER), Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); @@ -206,6 +207,11 @@ public void setup() throws Exception { final LinkedHashMap cdcColumns = new LinkedHashMap<>(COLUMNS); cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); + DIFFER = new RecordDiffer( + Pair.of(id1.name(""), AirbyteProtocolType.INTEGER), + Pair.of(id2.name(""), AirbyteProtocolType.INTEGER), + Pair.of(cursor.name(""), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); + namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5); // This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any // assumptions about StreamId structure. diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 48ca40f32f889..577c5a921c7c4 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -68,6 +68,7 @@ public abstract class BaseTypingDedupingTest { throw new RuntimeException(e); } } + // TODO use columnID to get escaped names private static final RecordDiffer DIFFER = new RecordDiffer( Pair.of("id1", AirbyteProtocolType.INTEGER), Pair.of("id2", AirbyteProtocolType.INTEGER), @@ -112,7 +113,8 @@ public abstract class BaseTypingDedupingTest { /** * For a given stream, return the records that exist in the destination's final table. Each record * must be in the format {"_airbyte_raw_id": "...", "_airbyte_extracted_at": "...", "_airbyte_meta": - * {...}, "field1": ..., "field2": ..., ...}. + * {...}, "field1": ..., "field2": ..., ...}. If the destination renames (e.g. upcases) the airbyte + * fields, this method must revert that naming to use the exact strings "_airbyte_raw_id", etc. *

* For JSON-valued columns, there is some nuance: a SQL null should be represented as a missing * entry, whereas a JSON null should be represented as a diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java index 8367fdabe0c2c..65c862d069c4b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java @@ -7,12 +7,14 @@ import static java.util.stream.Collectors.joining; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator; import java.sql.SQLException; import java.util.List; import java.util.Map; +import java.util.stream.Stream; import org.apache.commons.text.StringSubstitutor; public class SnowflakeTestUtils { @@ -81,7 +83,23 @@ public static List dumpTable(final List columns, final JdbcDat """ SELECT ${columns} FROM ${table} ORDER BY ${extracted_at} ASC """)), - new SnowflakeTestSourceOperations()::rowToJson); + new SnowflakeTestSourceOperations()::rowToJson) + .stream().peek(row -> { + // Downcase the airbyte_* fields so that our test framework can recognize them. + Stream.of( + JavaBaseConstants.COLUMN_NAME_AB_RAW_ID, + JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, + JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, + JavaBaseConstants.COLUMN_NAME_DATA, + JavaBaseConstants.COLUMN_NAME_AB_META + ).forEach(columnName -> { + final JsonNode value = row.get(columnName.toUpperCase()); + if (value != null) { + ((ObjectNode) row).set(columnName, value); + ((ObjectNode) row).remove(columnName.toUpperCase()); + } + }); + }).toList(); } private static String quote(final String name) { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl index c17a8134a49ed..0e33c0f494379 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/alltypes_expectedrecords_final.jsonl @@ -1,6 +1,6 @@ -{"id1": 1, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000000Z", "array": ["foo"], "struct": {"foo": "bar"}, "string": "foo", "number": 42.1, "integer": 42, "boolean": true, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000Z", "timestamp_without_timezone": "2023-01-23T12:34:56.000000000", "time_with_timezone": "12:34:56Z", "time_without_timezone": "12:34:56.000000000", "date": "2023-01-23", "unknown": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} -{"id1": 2, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000000Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} -{"id1": 3, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000000Z", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} -{"id1": 4, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000000Z", "unknown": null, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}, "string": "{}"} +{"ID1": 1, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "ARRAY": ["foo"], "STRUCT": {"foo": "bar"}, "STRING": "foo", "NUMBER": 42.1, "INTEGER": 42, "BOOLEAN": true, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000Z", "TIMESTAMP_WITHOUT_TIMEZONE": "2023-01-23T12:34:56.000000000", "TIME_WITH_TIMEZONE": "12:34:56Z", "TIME_WITHOUT_TIMEZONE": "12:34:56.000000000", "DATE": "2023-01-23", "UNKNOWN": {}, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} +{"ID1": 2, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} +{"ID1": 3, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} +{"ID1": 4, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "UNKNOWN": null, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": ["Problem with `struct`", "Problem with `array`", "Problem with `number`", "Problem with `integer`", "Problem with `boolean`", "Problem with `timestamp_with_timezone`", "Problem with `timestamp_without_timezone`", "Problem with `time_with_timezone`", "Problem with `time_without_timezone`", "Problem with `date`"]}, "STRING": "{}"} // Note: no loss of precision on these numbers. A naive float64 conversion would yield 67.17411800000001. -{"id1": 5, "id2": 100, "updated_at": "2023-01-01T01:00:00.000000000Z", "number": 67.174118, "struct": {"nested_number": 67.174118}, "array": [67.174118], "unknown": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} +{"ID1": 5, "ID2": 100, "UPDATED_AT": "2023-01-01T01:00:00.000000000Z", "NUMBER": 67.174118, "STRUCT": {"nested_number": 67.174118}, "ARRAY": [67.174118], "UNKNOWN": 67.174118, "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl index 293be295e24d4..badbe48249a67 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/incrementaldedup_expectedrecords_final.jsonl @@ -1,2 +1,2 @@ -{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000000Z", "string": "Alice", "struct": {"city": "San Diego", "state": "CA"}, "integer": 84} -{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": ["Problem with `integer`"]}, "id1": 2, "id2": 100, "updated_at": "2023-01-01T03:00:00.000000000Z", "string": "Bob"} +{"_airbyte_raw_id": "80c99b54-54b4-43bd-b51b-1f67dafa2c52", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "ID1": 1, "ID2": 100, "UPDATED_AT": "2023-01-01T02:00:00.000000000Z", "STRING": "Alice", "STRUCT": {"city": "San Diego", "state": "CA"}, "INTEGER": 84} +{"_airbyte_raw_id": "ad690bfb-c2c2-4172-bd73-a16c86ccbb67", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": ["Problem with `integer`"]}, "ID1": 2, "ID2": 100, "UPDATED_AT": "2023-01-01T03:00:00.000000000Z", "STRING": "Bob"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/timestampformats_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/timestampformats_expectedrecords_final.jsonl index 0064d5adf1b3a..fbd07a85632be 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/timestampformats_expectedrecords_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/timestampformats_expectedrecords_final.jsonl @@ -1,12 +1,12 @@ // snowflake/jdbc is returning 9 decimals for all timestamp/time types -{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000Z", "time_with_timezone": "12:34:56Z"} -{"_airbyte_raw_id": "05028c5f-7813-4e9c-bd4b-387d1f8ba435", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000-08:00", "time_with_timezone": "12:34:56-08:00"} -{"_airbyte_raw_id": "95dfb0c6-6a67-4ba0-9935-643bebc90437", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000-08:00", "time_with_timezone": "12:34:56-0800"} -{"_airbyte_raw_id": "f3d8abe2-bb0f-4caf-8ddc-0641df02f3a9", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000-08:00", "time_with_timezone": "12:34:56-08"} -{"_airbyte_raw_id": "a81ed40a-2a49-488d-9714-d53e8b052968", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000+08:00", "time_with_timezone": "12:34:56+08:00"} -{"_airbyte_raw_id": "c07763a0-89e6-4cb7-b7d0-7a34a7c9918a", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000+08:00", "time_with_timezone": "12:34:56+0800"} -{"_airbyte_raw_id": "358d3b52-50ab-4e06-9094-039386f9bf0d", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.000000000+08:00", "time_with_timezone": "12:34:56+08"} -{"_airbyte_raw_id": "db8200ac-b2b9-4b95-a053-8a0343042751", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_with_timezone": "2023-01-23T12:34:56.123000000Z", "time_with_timezone": "12:34:56.123Z"} +{"_airbyte_raw_id": "14ba7c7f-e398-4e69-ac22-28d578400dbc", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000Z", "TIME_WITH_TIMEZONE": "12:34:56Z"} +{"_airbyte_raw_id": "05028c5f-7813-4e9c-bd4b-387d1f8ba435", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000-08:00", "TIME_WITH_TIMEZONE": "12:34:56-08:00"} +{"_airbyte_raw_id": "95dfb0c6-6a67-4ba0-9935-643bebc90437", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000-08:00", "TIME_WITH_TIMEZONE": "12:34:56-0800"} +{"_airbyte_raw_id": "f3d8abe2-bb0f-4caf-8ddc-0641df02f3a9", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000-08:00", "TIME_WITH_TIMEZONE": "12:34:56-08"} +{"_airbyte_raw_id": "a81ed40a-2a49-488d-9714-d53e8b052968", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000+08:00", "TIME_WITH_TIMEZONE": "12:34:56+08:00"} +{"_airbyte_raw_id": "c07763a0-89e6-4cb7-b7d0-7a34a7c9918a", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000+08:00", "TIME_WITH_TIMEZONE": "12:34:56+0800"} +{"_airbyte_raw_id": "358d3b52-50ab-4e06-9094-039386f9bf0d", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.000000000+08:00", "TIME_WITH_TIMEZONE": "12:34:56+08"} +{"_airbyte_raw_id": "db8200ac-b2b9-4b95-a053-8a0343042751", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITH_TIMEZONE": "2023-01-23T12:34:56.123000000Z", "TIME_WITH_TIMEZONE": "12:34:56.123Z"} -{"_airbyte_raw_id": "10ce5d93-6923-4217-a46f-103833837038", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_without_timezone": "2023-01-23T12:34:56.000000000", "time_without_timezone": "12:34:56.000000000", "date": "2023-01-23"} -{"_airbyte_raw_id": "a7a6e176-7464-4a0b-b55c-b4f936e8d5a1", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "timestamp_without_timezone": "2023-01-23T12:34:56.123000000", "time_without_timezone": "12:34:56.123000000"} +{"_airbyte_raw_id": "10ce5d93-6923-4217-a46f-103833837038", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITHOUT_TIMEZONE": "2023-01-23T12:34:56.000000000", "TIME_WITHOUT_TIMEZONE": "12:34:56.000000000", "DATE": "2023-01-23"} +{"_airbyte_raw_id": "a7a6e176-7464-4a0b-b55c-b4f936e8d5a1", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "TIMESTAMP_WITHOUT_TIMEZONE": "2023-01-23T12:34:56.123000000", "TIME_WITHOUT_TIMEZONE": "12:34:56.123000000"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/weirdcolumnnames_expectedrecords_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/weirdcolumnnames_expectedrecords_final.jsonl index b61de2de5f581..a745d6e1dc2f6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/weirdcolumnnames_expectedrecords_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/sqlgenerator/weirdcolumnnames_expectedrecords_final.jsonl @@ -1,3 +1,3 @@ // columns with issues: -// * endswithbackslash\ -> written as null to the final table -{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "id1": 1, "id2": 100, "updated_at": "2023-01-01T02:00:00.000000000Z", "$starts_with_dollar_sign": "foo", "includes\"doublequote": "foo", "includes'singlequote": "foo", "includes`backtick": "foo", "includes.period": "foo", "includes$$doubledollar": "foo", "endswithbackslash\\": "foo"} +// * ENDSWITHBACKSLASH\ -> written as null to the final table +{"_airbyte_raw_id": "7e7330a1-42fb-41ec-a955-52f18bd61964", "_airbyte_extracted_at": "2023-01-01T00:00:00.000000000Z", "_airbyte_meta": {"errors": []}, "ID1": 1, "ID2": 100, "UPDATED_AT": "2023-01-01T02:00:00.000000000Z", "$STARTS_WITH_DOLLAR_SIGN": "foo", "INCLUDES\"DOUBLEQUOTE": "foo", "INCLUDES'SINGLEQUOTE": "foo", "INCLUDES`BACKTICK": "foo", "INCLUDES.PERIOD": "foo", "INCLUDES$$DOUBLEDOLLAR": "foo", "ENDSWITHBACKSLASH\\": "foo"} From c0e1e376ff11be8f780143bc65e93beb095356d4 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 11:28:52 -0700 Subject: [PATCH 06/19] more fixes --- .../SnowflakeSqlGeneratorIntegrationTest.java | 92 +++++++++---------- 1 file changed, 46 insertions(+), 46 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index cd724a104f65a..a2a842592d784 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -119,7 +119,7 @@ protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, final List records) throws Exception { final List columnNames = includeCdcDeletedAt ? FINAL_TABLE_COLUMN_NAMES_CDC : FINAL_TABLE_COLUMN_NAMES; - final String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_ab_cdc_deleted_at\"" : ""; + final String cdcDeletedAtName = includeCdcDeletedAt ? ",\"_AB_CDC_DELETED_AT\"" : ""; final String cdcDeletedAtExtract = includeCdcDeletedAt ? ",column19" : ""; final String recordsText = records.stream() // For each record, convert it to a string like "(rawId, extractedAt, loadedAt, data)" @@ -132,7 +132,7 @@ protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, database.execute(new StringSubstitutor( Map.of( - "final_table_id", streamId.finalTableId(SnowflakeSqlGenerator.QUOTE, suffix), + "final_table_id", streamId.finalTableId(SnowflakeSqlGenerator.QUOTE, suffix.toUpperCase()), "cdc_deleted_at_name", cdcDeletedAtName, "cdc_deleted_at_extract", cdcDeletedAtExtract, "records", recordsText), @@ -141,50 +141,50 @@ protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, // Similar to insertRawTableRecords, some of these columns are declared as string and wrapped in // parse_json(). """ - INSERT INTO #{final_table_id} ( - "_airbyte_raw_id", - "_airbyte_extracted_at", - "_airbyte_meta", - "id1", - "id2", - "updated_at", - "struct", - "array", - "string", - "number", - "integer", - "boolean", - "timestamp_with_timezone", - "timestamp_without_timezone", - "time_with_timezone", - "time_without_timezone", - "date", - "unknown" - #{cdc_deleted_at_name} - ) - SELECT - column1, - column2, - PARSE_JSON(column3), - column4, - column5, - column6, - PARSE_JSON(column7), - PARSE_JSON(column8), - column9, - column10, - column11, - column12, - column13, - column14, - column15, - column16, - column17, - PARSE_JSON(column18) - #{cdc_deleted_at_extract} - FROM VALUES - #{records} - """)); + INSERT INTO #{final_table_id} ( + "_AIRBYTE_RAW_ID", + "_AIRBYTE_EXTRACTED_AT", + "_AIRBYTE_META", + "ID1", + "ID2", + "UPDATED_AT", + "STRUCT", + "ARRAY", + "STRING", + "NUMBER", + "INTEGER", + "BOOLEAN", + "TIMESTAMP_WITH_TIMEZONE", + "TIMESTAMP_WITHOUT_TIMEZONE", + "TIME_WITH_TIMEZONE", + "TIME_WITHOUT_TIMEZONE", + "DATE", + "UNKNOWN" + #{cdc_deleted_at_name} + ) + SELECT + column1, + column2, + PARSE_JSON(column3), + column4, + column5, + column6, + PARSE_JSON(column7), + PARSE_JSON(column8), + column9, + column10, + column11, + column12, + column13, + column14, + column15, + column16, + column17, + PARSE_JSON(column18) + #{cdc_deleted_at_extract} + FROM VALUES + #{records} + """)); } private String dollarQuoteWrap(final JsonNode node) { From dbbfd2695a767c205deef9f80bd420f48b3dc1d3 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 11:39:52 -0700 Subject: [PATCH 07/19] fix createTable test --- .../SnowflakeSqlGeneratorIntegrationTest.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index a2a842592d784..0a4376479676e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -238,7 +238,8 @@ public void testCreateTableIncremental() throws Exception { final String sql = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(sql); - final Optional tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "users_final", namespace)) + // Note that USERS_FINAL is uppercased here. This is intentional, because snowflake upcases unquoted identifiers. + final Optional tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "USERS_FINAL", namespace.toUpperCase())) .stream().map(record -> record.get("kind").asText()) .findFirst(); final Map columns = database.queryJsons( @@ -251,8 +252,8 @@ public void testCreateTableIncremental() throws Exception { ORDER BY ordinal_position; """, databaseName, - namespace, - "users_final").stream() + namespace.toUpperCase(), + "USERS_FINAL").stream() .collect(toMap( record -> record.get("COLUMN_NAME").asText(), record -> { @@ -267,24 +268,24 @@ record -> { () -> assertEquals(Optional.of("TABLE"), tableKind, "Table should be permanent, not transient"), () -> assertEquals( ImmutableMap.builder() - .put("_airbyte_raw_id", "TEXT") - .put("_airbyte_extracted_at", "TIMESTAMP_TZ") - .put("_airbyte_meta", "VARIANT") - .put("id1", "NUMBER(38, 0)") - .put("id2", "NUMBER(38, 0)") - .put("updated_at", "TIMESTAMP_TZ") - .put("struct", "OBJECT") - .put("array", "ARRAY") - .put("string", "TEXT") - .put("number", "FLOAT") - .put("integer", "NUMBER(38, 0)") - .put("boolean", "BOOLEAN") - .put("timestamp_with_timezone", "TIMESTAMP_TZ") - .put("timestamp_without_timezone", "TIMESTAMP_NTZ") - .put("time_with_timezone", "TEXT") - .put("time_without_timezone", "TIME") - .put("date", "DATE") - .put("unknown", "VARIANT") + .put("_AIRBYTE_RAW_ID", "TEXT") + .put("_AIRBYTE_EXTRACTED_AT", "TIMESTAMP_TZ") + .put("_AIRBYTE_META", "VARIANT") + .put("ID1", "NUMBER(38, 0)") + .put("ID2", "NUMBER(38, 0)") + .put("UPDATED_AT", "TIMESTAMP_TZ") + .put("STRUCT", "OBJECT") + .put("ARRAY", "ARRAY") + .put("STRING", "TEXT") + .put("NUMBER", "FLOAT") + .put("INTEGER", "NUMBER(38, 0)") + .put("BOOLEAN", "BOOLEAN") + .put("TIMESTAMP_WITH_TIMEZONE", "TIMESTAMP_TZ") + .put("TIMESTAMP_WITHOUT_TIMEZONE", "TIMESTAMP_NTZ") + .put("TIME_WITH_TIMEZONE", "TEXT") + .put("TIME_WITHOUT_TIMEZONE", "TIME") + .put("DATE", "DATE") + .put("UNKNOWN", "VARIANT") .build(), columns)); } From c4c362cc4dfa75b2b51d7b599bf23952063c655b Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 11:48:20 -0700 Subject: [PATCH 08/19] more fix --- .../typing_deduping/BaseSqlGeneratorIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index f609d80a157c1..145702e496808 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -497,7 +497,7 @@ public void incrementalDedupNoCursor() throws Exception { actualFinalRecords); assertAll( () -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()), - () -> assertEquals("bar", actualFinalRecords.get(0).get("string").asText())); + () -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText())); } @Test From 75663db4eab8c390d84bea985eeb8860718f8f58 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 11:56:09 -0700 Subject: [PATCH 09/19] more fix --- .../typing_deduping/BaseSqlGeneratorIntegrationTest.java | 5 +---- .../typing_deduping/SnowflakeDestinationHandler.java | 8 ++++---- .../snowflake/typing_deduping/SnowflakeSqlGenerator.java | 8 ++++---- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 145702e496808..a76e86bb89332 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -85,10 +85,7 @@ public abstract class BaseSqlGeneratorIntegrationTest { } // TODO use columnID to get escaped names - protected RecordDiffer DIFFER = new RecordDiffer( - Pair.of("id1", AirbyteProtocolType.INTEGER), - Pair.of("id2", AirbyteProtocolType.INTEGER), - Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); + protected RecordDiffer DIFFER; /** * Subclasses may use these four StreamConfigs in their tests. diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index b77f9f46d49f6..20539b4e12804 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -40,8 +40,8 @@ public Optional findExistingTable(final StreamId id) t ORDER BY ordinal_position; """, databaseName.toUpperCase(), - id.finalNamespace(), - id.finalName()).stream() + id.finalNamespace().toUpperCase(), + id.finalName().toUpperCase()).stream() .collect(LinkedHashMap::new, (map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()), LinkedHashMap::putAll); @@ -65,8 +65,8 @@ public boolean isFinalTableEmpty(final StreamId id) throws SQLException { AND table_name = ? """, databaseName.toUpperCase(), - id.finalNamespace(), - id.finalName()); + id.finalNamespace().toUpperCase(), + id.finalName().toUpperCase()); return rowCount == 0; } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 67bce89696a32..7722c26715e00 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -122,14 +122,14 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue())), LinkedHashMap::putAll); final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> !JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.contains(column.getKey())) + .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase).noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) .collect(LinkedHashMap::new, (map, column) -> map.put(column.getKey(), column.getValue()), LinkedHashMap::putAll); final boolean sameColumns = actualColumns.equals(intendedColumns) - && "TEXT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID)) - && "TIMESTAMP_TZ".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT)) - && "VARIANT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META)); + && "TEXT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID.toUpperCase())) + && "TIMESTAMP_TZ".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase())) + && "VARIANT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META.toUpperCase())); return sameColumns; } From 09ea37342387473d3dc3914e62d82dd0b1bd5247 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 13:02:27 -0700 Subject: [PATCH 10/19] more fix --- .../BaseSqlGeneratorIntegrationTest.java | 6 +-- .../BaseTypingDedupingTest.java | 17 ++++--- .../typing_deduping/RecordDiffer.java | 46 +++++++++++++------ .../AbstractSnowflakeTypingDedupingTest.java | 11 ++++- ...orchange_expectedrecords_dedup_final.jsonl | 6 +-- .../sync1_expectedrecords_dedup_final.jsonl | 8 ++-- ...sync1_expectedrecords_nondedup_final.jsonl | 8 ++-- ...ectedrecords_incremental_dedup_final.jsonl | 6 +-- ...ctedrecords_fullrefresh_append_final.jsonl | 14 +++--- ...drecords_fullrefresh_overwrite_final.jsonl | 6 +-- ...ectedrecords_incremental_dedup_final.jsonl | 4 +- 11 files changed, 82 insertions(+), 50 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index a76e86bb89332..067d0df830389 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -205,9 +205,9 @@ public void setup() throws Exception { cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE); DIFFER = new RecordDiffer( - Pair.of(id1.name(""), AirbyteProtocolType.INTEGER), - Pair.of(id2.name(""), AirbyteProtocolType.INTEGER), - Pair.of(cursor.name(""), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); + Pair.of(id1, AirbyteProtocolType.INTEGER), + Pair.of(id2, AirbyteProtocolType.INTEGER), + Pair.of(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE)); namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5); // This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 577c5a921c7c4..a4976a838292c 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -68,12 +68,7 @@ public abstract class BaseTypingDedupingTest { throw new RuntimeException(e); } } - // TODO use columnID to get escaped names - private static final RecordDiffer DIFFER = new RecordDiffer( - Pair.of("id1", AirbyteProtocolType.INTEGER), - Pair.of("id2", AirbyteProtocolType.INTEGER), - Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE), - Pair.of("old_cursor", AirbyteProtocolType.INTEGER)); + private RecordDiffer DIFFER; private String randomSuffix; private JsonNode config; @@ -139,6 +134,8 @@ public abstract class BaseTypingDedupingTest { */ protected abstract void teardownStreamAndNamespace(String streamNamespace, String streamName) throws Exception; + protected abstract SqlGenerator getSqlGenerator(); + /** * Destinations which need to clean up resources after an entire test finishes should override this * method. For example, if you want to gracefully close a database connection, you should do that @@ -166,6 +163,14 @@ public void setup() throws Exception { streamNamespace = "typing_deduping_test" + getUniqueSuffix(); streamName = "test_stream" + getUniqueSuffix(); streamsToTearDown = new ArrayList<>(); + + final SqlGenerator generator = getSqlGenerator(); + DIFFER = new RecordDiffer( + Pair.of(generator.buildColumnId("id1"), AirbyteProtocolType.INTEGER), + Pair.of(generator.buildColumnId("id2"), AirbyteProtocolType.INTEGER), + Pair.of(generator.buildColumnId("updated_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE), + Pair.of(generator.buildColumnId("old_cursor"), AirbyteProtocolType.INTEGER)); + LOGGER.info("Using stream namespace {} and name {}", streamNamespace, streamName); } diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java index 058986346d295..a270ec5b4fa5a 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java @@ -37,19 +37,39 @@ */ public class RecordDiffer { - private final Comparator recordIdentityComparator; - private final Comparator recordSortComparator; - private final Function recordIdentityExtractor; + private final Comparator rawRecordIdentityComparator; + private final Comparator rawRecordSortComparator; + private final Function rawRecordIdentityExtractor; + + private final Comparator finalRecordIdentityComparator; + private final Comparator finalRecordSortComparator; + private final Function finalRecordIdentityExtractor; /** * @param identifyingColumns Which fields constitute a unique record (typically PK+cursor). Do _not_ * include extracted_at; it is handled automatically. */ @SafeVarargs - public RecordDiffer(final Pair... identifyingColumns) { - this.recordIdentityComparator = buildIdentityComparator(identifyingColumns); - this.recordSortComparator = recordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id"))); - this.recordIdentityExtractor = buildIdentityExtractor(identifyingColumns); + public RecordDiffer(final Pair... identifyingColumns) { + final Pair[] rawTableIdentifyingColumns = Arrays.stream(identifyingColumns) + .map(p -> Pair.of( + // Raw tables always retain the original column names + p.getLeft().originalName(), + p.getRight())) + .toArray(Pair[]::new); + this.rawRecordIdentityComparator = buildIdentityComparator(rawTableIdentifyingColumns); + this.rawRecordSortComparator = rawRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id"))); + this.rawRecordIdentityExtractor = buildIdentityExtractor(rawTableIdentifyingColumns); + + final Pair[] finalTableIdentifyingColumns = Arrays.stream(identifyingColumns) + .map(p -> Pair.of( + // Final tables may have modified the column names, so use the final name here. + p.getLeft().name(), + p.getRight())). + toArray(Pair[]::new); + this.finalRecordIdentityComparator = buildIdentityComparator(finalTableIdentifyingColumns); + this.finalRecordSortComparator = finalRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id"))); + this.finalRecordIdentityExtractor = buildIdentityExtractor(finalTableIdentifyingColumns); } /** @@ -70,9 +90,9 @@ public void diffRawTableRecords(final List expectedRecords, final List final String diff = diffRecords( expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()), - recordIdentityComparator, - recordSortComparator, - recordIdentityExtractor); + rawRecordIdentityComparator, + rawRecordSortComparator, + rawRecordIdentityExtractor); if (!diff.isEmpty()) { fail("Raw table was incorrect.\n" + diff); @@ -83,9 +103,9 @@ public void diffFinalTableRecords(final List expectedRecords, final Li final String diff = diffRecords( expectedRecords, actualRecords, - recordIdentityComparator, - recordSortComparator, - recordIdentityExtractor); + finalRecordIdentityComparator, + finalRecordSortComparator, + finalRecordIdentityExtractor); if (!diff.isEmpty()) { fail("Final table was incorrect.\n" + diff); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index 8fb2205f62dac..0a0042b6cbebf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -13,6 +13,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase; @@ -62,7 +63,7 @@ protected List dumpFinalTableRecords(String streamNamespace, final Str if (streamNamespace == null) { streamNamespace = getDefaultSchema(); } - return SnowflakeTestUtils.dumpFinalTable(database, databaseName, streamNamespace, streamName); + return SnowflakeTestUtils.dumpFinalTable(database, databaseName, streamNamespace.toUpperCase(), streamName.toUpperCase()); } @Override @@ -77,8 +78,9 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s DROP SCHEMA IF EXISTS "%s" CASCADE """, getRawSchema(), + // Raw table is still lowercase. StreamId.concatenateRawTableName(streamNamespace, streamName), - streamNamespace)); + streamNamespace.toUpperCase())); } @Override @@ -86,6 +88,11 @@ protected void globalTeardown() throws Exception { DataSourceFactory.close(dataSource); } + @Override + protected SqlGenerator getSqlGenerator() { + return new SnowflakeSqlGenerator(); + } + /** * Subclasses using a config with a nonstandard raw table schema should override this method. */ diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl index bf928d688997b..d4b74d4d1854a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl @@ -1,3 +1,3 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "old_cursor": 1, "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "old_cursor": 2, "name": "Bob", "address": {"city": "Boston", "state": "MA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "old_cursor": 3, "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "OLD_CURSOR": 1, "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 201, "OLD_CURSOR": 2, "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "OLD_CURSOR": 3, "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl index f30261d6154c0..5bedb75674ade 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_dedup_final.jsonl @@ -1,4 +1,4 @@ -// Keep the Alice record with more recent updated_at -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000000Z", "name": "Charlie"} +// Keep the Alice record with more recent UPDATED_AT +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl index a26cf1d5289dc..e9a97a6e9f84b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync1_expectedrecords_nondedup_final.jsonl @@ -1,5 +1,5 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} // Invalid columns are nulled out (i.e. SQL null, not JSON null) -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl index 7ac3264abcc4a..f690e7ff76dba 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl @@ -1,3 +1,3 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} -// Charlie wasn't reemitted with updated_at, so it still has a null cursor -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +// Charlie wasn't reemitted with UPDATED_AT, so it still has a null cursor +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "NAME": "Charlie"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl index fd96bc5161696..66d6eb2f4564a 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_append_final.jsonl @@ -1,8 +1,8 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:00:00.000000000Z", "name": "Alice", "address": {"city": "San Francisco", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-01T00:01:00.000000000Z", "name": "Alice", "address": {"city": "Los Angeles", "state": "CA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-01T00:02:00.000000000Z", "name": "Bob", "address": {"city": "Boston", "state": "MA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "San Francisco", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-01T00:01:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Los Angeles", "state": "CA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-01T00:02:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "Boston", "state": "MA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000000Z"} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl index da89677485f0e..fc4c3ae500bb6 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl @@ -1,3 +1,3 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Bob", "address": {"city": "New York", "state": "NY"}} -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 201, "updated_at": "2000-01-02T00:01:00.000000000Z", "_ab_cdc_deleted_at": "1970-01-01T00:00:00.000000000Z"} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Bob", "ADDRESS": {"city": "New York", "state": "NY"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 201, "UPDATED_AT": "2000-01-02T00:01:00.000000000Z", "_AB_CDC_DELETED_AT": "1970-01-01T00:00:00.000000000Z"} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl index 96442a4e7fcd3..c4db7356aed28 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/resources/dat/sync2_expectedrecords_incremental_dedup_final.jsonl @@ -1,3 +1,3 @@ -{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "id1": 1, "id2": 200, "updated_at": "2000-01-02T00:00:00.000000000Z", "name": "Alice", "address": {"city": "Seattle", "state": "WA"}} +{"_airbyte_extracted_at": "1970-01-01T00:00:02.000000000-08:00", "_airbyte_meta":{"errors":[]}, "ID1": 1, "ID2": 200, "UPDATED_AT": "2000-01-02T00:00:00.000000000Z", "NAME": "Alice", "ADDRESS": {"city": "Seattle", "state": "WA"}} // Delete Bob, keep Charlie -{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "id1": 2, "id2": 200, "updated_at": "2000-01-01T00:03:00.000000000Z", "name": "Charlie"} +{"_airbyte_extracted_at": "1970-01-01T00:00:01.000000000-08:00", "_airbyte_meta": {"errors":["Problem with `age`", "Problem with `registration_date`"]}, "ID1": 2, "ID2": 200, "UPDATED_AT": "2000-01-01T00:03:00.000000000Z", "NAME": "Charlie"} From b55d80da9e0c451a491965db35719fe8a284d94a Mon Sep 17 00:00:00 2001 From: edgao Date: Thu, 31 Aug 2023 20:07:46 +0000 Subject: [PATCH 11/19] Automated Commit - Formatting Changes --- .../BaseSqlGeneratorIntegrationTest.java | 4 +- .../typing_deduping/RecordDiffer.java | 4 +- .../SnowflakeSqlGenerator.java | 3 +- .../snowflake/SnowflakeTestUtils.java | 21 ++-- .../SnowflakeSqlGeneratorIntegrationTest.java | 98 ++++++++++--------- 5 files changed, 68 insertions(+), 62 deletions(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index 067d0df830389..e77200ae42545 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -117,8 +117,8 @@ public abstract class BaseSqlGeneratorIntegrationTest { protected abstract DestinationHandler getDestinationHandler(); /** - * Subclasses should override this method if they need to make changes to the stream ID. - * For example, you could upcase the final table name here. + * Subclasses should override this method if they need to make changes to the stream ID. For + * example, you could upcase the final table name here. */ protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) { return new StreamId(namespace, finalTableName, namespace, rawTableName, namespace, finalTableName); diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java index a270ec5b4fa5a..e461b5f38bc1a 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java @@ -65,8 +65,8 @@ public RecordDiffer(final Pair... identifyingColumns) { .map(p -> Pair.of( // Final tables may have modified the column names, so use the final name here. p.getLeft().name(), - p.getRight())). - toArray(Pair[]::new); + p.getRight())) + .toArray(Pair[]::new); this.finalRecordIdentityComparator = buildIdentityComparator(finalTableIdentifyingColumns); this.finalRecordSortComparator = finalRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id"))); this.finalRecordIdentityExtractor = buildIdentityExtractor(finalTableIdentifyingColumns); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 7722c26715e00..e2eb598608173 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -122,7 +122,8 @@ public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, fina (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue())), LinkedHashMap::putAll); final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase).noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) + .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) + .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) .collect(LinkedHashMap::new, (map, column) -> map.put(column.getKey(), column.getValue()), LinkedHashMap::putAll); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java index 65c862d069c4b..03c5b28a4079e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeTestUtils.java @@ -75,7 +75,11 @@ public static List dumpFinalTable(final JdbcDatabase database, final S * * @param tableIdentifier Table identifier (e.g. "schema.table"), with quotes if necessary. */ - public static List dumpTable(final List columns, final JdbcDatabase database, final String tableIdentifier, final boolean upcaseExtractedAt) throws SQLException { + public static List dumpTable(final List columns, + final JdbcDatabase database, + final String tableIdentifier, + final boolean upcaseExtractedAt) + throws SQLException { return database.bufferedResultSetQuery(connection -> connection.createStatement().executeQuery(new StringSubstitutor(Map.of( "columns", columns.stream().collect(joining(",")), "table", tableIdentifier, @@ -91,14 +95,13 @@ public static List dumpTable(final List columns, final JdbcDat JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT, JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, JavaBaseConstants.COLUMN_NAME_DATA, - JavaBaseConstants.COLUMN_NAME_AB_META - ).forEach(columnName -> { - final JsonNode value = row.get(columnName.toUpperCase()); - if (value != null) { - ((ObjectNode) row).set(columnName, value); - ((ObjectNode) row).remove(columnName.toUpperCase()); - } - }); + JavaBaseConstants.COLUMN_NAME_AB_META).forEach(columnName -> { + final JsonNode value = row.get(columnName.toUpperCase()); + if (value != null) { + ((ObjectNode) row).set(columnName, value); + ((ObjectNode) row).remove(columnName.toUpperCase()); + } + }); }).toList(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 0a4376479676e..cb8613914aa0e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -141,50 +141,50 @@ protected void insertFinalTableRecords(final boolean includeCdcDeletedAt, // Similar to insertRawTableRecords, some of these columns are declared as string and wrapped in // parse_json(). """ - INSERT INTO #{final_table_id} ( - "_AIRBYTE_RAW_ID", - "_AIRBYTE_EXTRACTED_AT", - "_AIRBYTE_META", - "ID1", - "ID2", - "UPDATED_AT", - "STRUCT", - "ARRAY", - "STRING", - "NUMBER", - "INTEGER", - "BOOLEAN", - "TIMESTAMP_WITH_TIMEZONE", - "TIMESTAMP_WITHOUT_TIMEZONE", - "TIME_WITH_TIMEZONE", - "TIME_WITHOUT_TIMEZONE", - "DATE", - "UNKNOWN" - #{cdc_deleted_at_name} - ) - SELECT - column1, - column2, - PARSE_JSON(column3), - column4, - column5, - column6, - PARSE_JSON(column7), - PARSE_JSON(column8), - column9, - column10, - column11, - column12, - column13, - column14, - column15, - column16, - column17, - PARSE_JSON(column18) - #{cdc_deleted_at_extract} - FROM VALUES - #{records} - """)); + INSERT INTO #{final_table_id} ( + "_AIRBYTE_RAW_ID", + "_AIRBYTE_EXTRACTED_AT", + "_AIRBYTE_META", + "ID1", + "ID2", + "UPDATED_AT", + "STRUCT", + "ARRAY", + "STRING", + "NUMBER", + "INTEGER", + "BOOLEAN", + "TIMESTAMP_WITH_TIMEZONE", + "TIMESTAMP_WITHOUT_TIMEZONE", + "TIME_WITH_TIMEZONE", + "TIME_WITHOUT_TIMEZONE", + "DATE", + "UNKNOWN" + #{cdc_deleted_at_name} + ) + SELECT + column1, + column2, + PARSE_JSON(column3), + column4, + column5, + column6, + PARSE_JSON(column7), + PARSE_JSON(column8), + column9, + column10, + column11, + column12, + column13, + column14, + column15, + column16, + column17, + PARSE_JSON(column18) + #{cdc_deleted_at_extract} + FROM VALUES + #{records} + """)); } private String dollarQuoteWrap(final JsonNode node) { @@ -238,10 +238,12 @@ public void testCreateTableIncremental() throws Exception { final String sql = generator.createTable(incrementalDedupStream, "", false); destinationHandler.execute(sql); - // Note that USERS_FINAL is uppercased here. This is intentional, because snowflake upcases unquoted identifiers. - final Optional tableKind = database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "USERS_FINAL", namespace.toUpperCase())) - .stream().map(record -> record.get("kind").asText()) - .findFirst(); + // Note that USERS_FINAL is uppercased here. This is intentional, because snowflake upcases unquoted + // identifiers. + final Optional tableKind = + database.queryJsons(String.format("SHOW TABLES LIKE '%s' IN SCHEMA \"%s\";", "USERS_FINAL", namespace.toUpperCase())) + .stream().map(record -> record.get("kind").asText()) + .findFirst(); final Map columns = database.queryJsons( """ SELECT column_name, data_type, numeric_precision, numeric_scale From 35911626644ed1a4e831229ffb009187079a18b3 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 13:22:30 -0700 Subject: [PATCH 12/19] make DATs pass --- .../destination/typing_deduping/BaseTypingDedupingTest.java | 2 +- .../typing_deduping/AbstractBigQueryTypingDedupingTest.java | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index a4976a838292c..b4b2d12103431 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -416,7 +416,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception { // The raw data is unaffected by the schema, but the final table should not have a `name` column. final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream() - .peek(record -> ((ObjectNode) record).remove("name")) + .peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name())) .toList(); verifySyncResult(expectedRawRecords2, expectedFinalRecords2); } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java index ffc5104f5cd7f..a7d5915f67b2e 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java @@ -13,6 +13,7 @@ import com.google.cloud.bigquery.TableResult; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest; +import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.bigquery.BigQueryDestination; import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils; @@ -79,6 +80,11 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents()); } + @Override + protected SqlGenerator getSqlGenerator() { + return new BigQuerySqlGenerator(null); + } + /** * Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON * _airbyte_data). Then run a sync using our current version. From befa893e9437fbb330552f77b939bc182005ed00 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 31 Aug 2023 13:25:38 -0700 Subject: [PATCH 13/19] resolve todo --- .../typing_deduping/BaseSqlGeneratorIntegrationTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java index e77200ae42545..6ec57150410e3 100644 --- a/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java @@ -84,7 +84,6 @@ public abstract class BaseSqlGeneratorIntegrationTest { Stream.of("_ab_cdc_deleted_at")).toList(); } - // TODO use columnID to get escaped names protected RecordDiffer DIFFER; /** From abde8faabaa8dec8acc4d7711bb01f799458b621 Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 1 Sep 2023 13:35:10 -0700 Subject: [PATCH 14/19] Destination snowflake: table casing migration (#30068) Co-authored-by: edgao --- .../typing_deduping/DefaultTyperDeduper.java | 10 +- ...Migrator.java => NoopV2TableMigrator.java} | 2 +- ...ableMigrator.java => V2TableMigrator.java} | 4 +- .../bigquery/BigQueryDestination.java | 4 +- ...ator.java => BigQueryV2TableMigrator.java} | 8 +- .../SnowflakeInternalStagingDestination.java | 6 +- .../SnowflakeV2TableMigrator.java | 118 ++++++++++++++++++ .../AbstractSnowflakeTypingDedupingTest.java | 77 ++++++++++++ 8 files changed, 213 insertions(+), 16 deletions(-) rename airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/{NoopV2RawTableMigrator.java => NoopV2TableMigrator.java} (67%) rename airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/{V2RawTableMigrator.java => V2TableMigrator.java} (70%) rename airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/{BigQueryV2RawTableMigrator.java => BigQueryV2TableMigrator.java} (95%) create mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java index 5c078e83c3c5f..f55081f20e28f 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java @@ -37,7 +37,7 @@ public class DefaultTyperDeduper implements TyperDeduper private final DestinationHandler destinationHandler; private final DestinationV1V2Migrator v1V2Migrator; - private final V2RawTableMigrator v2RawTableMigrator; + private final V2TableMigrator v2TableMigrator; private final ParsedCatalog parsedCatalog; private Set overwriteStreamsWithTmpTable; private final Set streamsWithSuccesfulSetup; @@ -46,12 +46,12 @@ public DefaultTyperDeduper(final SqlGenerator sqlGenerat final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, final DestinationV1V2Migrator v1V2Migrator, - final V2RawTableMigrator v2RawTableMigrator) { + final V2TableMigrator v2TableMigrator) { this.sqlGenerator = sqlGenerator; this.destinationHandler = destinationHandler; this.parsedCatalog = parsedCatalog; this.v1V2Migrator = v1V2Migrator; - this.v2RawTableMigrator = v2RawTableMigrator; + this.v2TableMigrator = v2TableMigrator; this.streamsWithSuccesfulSetup = new HashSet<>(); } @@ -60,7 +60,7 @@ public DefaultTyperDeduper( final DestinationHandler destinationHandler, final ParsedCatalog parsedCatalog, final DestinationV1V2Migrator v1V2Migrator) { - this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>()); + this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>()); } /** @@ -82,7 +82,7 @@ public void prepareTables() throws Exception { for (final StreamConfig stream : parsedCatalog.streams()) { // Migrate the Raw Tables if this is the first v2 sync after a v1 sync v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream); - v2RawTableMigrator.migrateIfNecessary(stream); + v2TableMigrator.migrateIfNecessary(stream); final Optional existingTable = destinationHandler.findExistingTable(stream.id()); if (existingTable.isPresent()) { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2RawTableMigrator.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java similarity index 67% rename from airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2RawTableMigrator.java rename to airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java index 8535481d78470..5ba825b5a49dc 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2RawTableMigrator.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/NoopV2TableMigrator.java @@ -4,7 +4,7 @@ package io.airbyte.integrations.base.destination.typing_deduping; -public class NoopV2RawTableMigrator implements V2RawTableMigrator { +public class NoopV2TableMigrator implements V2TableMigrator { @Override public void migrateIfNecessary(final StreamConfig streamConfig) { diff --git a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2RawTableMigrator.java b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java similarity index 70% rename from airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2RawTableMigrator.java rename to airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java index d17722ac1279d..19c04e81eb9a0 100644 --- a/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2RawTableMigrator.java +++ b/airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/V2TableMigrator.java @@ -4,8 +4,8 @@ package io.airbyte.integrations.base.destination.typing_deduping; -public interface V2RawTableMigrator { +public interface V2TableMigrator { - void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException; + void migrateIfNecessary(final StreamConfig streamConfig) throws Exception; } diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index a8d075e207c53..34e13a7e87603 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -36,7 +36,7 @@ import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator; import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator; -import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2RawTableMigrator; +import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator; import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader; import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory; import io.airbyte.integrations.destination.bigquery.uploader.UploaderType; @@ -242,7 +242,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config, final TyperDeduper typerDeduper; parsedCatalog = catalogParser.parseCatalog(catalog); final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver); - final BigQueryV2RawTableMigrator v2RawTableMigrator = new BigQueryV2RawTableMigrator(bigquery); + final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery); typerDeduper = new DefaultTyperDeduper<>( sqlGenerator, new BigQueryDestinationHandler(bigquery, datasetLocation), diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2RawTableMigrator.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java similarity index 95% rename from airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2RawTableMigrator.java rename to airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java index cbbf5e122f379..b235e465677ce 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2RawTableMigrator.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryV2TableMigrator.java @@ -15,19 +15,19 @@ import com.google.cloud.bigquery.TableId; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; -import io.airbyte.integrations.base.destination.typing_deduping.V2RawTableMigrator; +import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator; import java.util.Map; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryV2RawTableMigrator implements V2RawTableMigrator { +public class BigQueryV2TableMigrator implements V2TableMigrator { - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2RawTableMigrator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class); private final BigQuery bq; - public BigQueryV2RawTableMigrator(final BigQuery bq) { + public BigQueryV2TableMigrator(final BigQuery bq) { this.bq = bq; } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index b7b8493fff921..2779a934d684b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -22,6 +22,7 @@ import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator; import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator; +import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator; import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.v0.AirbyteConnectionStatus; import io.airbyte.protocol.models.v0.AirbyteMessage; @@ -39,7 +40,7 @@ public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class); - private static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema"; + public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema"; private final String airbyteEnvironment; public SnowflakeInternalStagingDestination(final String airbyteEnvironment) { @@ -143,7 +144,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN } parsedCatalog = catalogParser.parseCatalog(catalog); final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName); - typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator); + final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler); + typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); return new StagingConsumerFactory().createAsync( outputRecordCollector, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java new file mode 100644 index 0000000000000..8ed6ebd9fcf43 --- /dev/null +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.snowflake.typing_deduping; + +import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; +import static io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingDestination.RAW_SCHEMA_OVERRIDE; + +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.TypingAndDedupingFlag; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; +import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import java.sql.SQLException; +import java.util.LinkedHashMap; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeV2TableMigrator implements V2TableMigrator { + + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeV2TableMigrator.class); + + private final JdbcDatabase database; + private final String rawNamespace; + private final String databaseName; + private final SnowflakeSqlGenerator generator; + private final SnowflakeDestinationHandler handler; + + public SnowflakeV2TableMigrator(final JdbcDatabase database, + final String databaseName, + final SnowflakeSqlGenerator generator, + final SnowflakeDestinationHandler handler) { + this.database = database; + this.databaseName = databaseName; + this.generator = generator; + this.handler = handler; + this.rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); + } + + @Override + public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception { + final StreamId caseSensitiveStreamId = buildStreamId_caseSensitive( + streamConfig.id().originalNamespace(), + streamConfig.id().originalName(), + rawNamespace); + final boolean syncModeRequiresMigration = streamConfig.destinationSyncMode() != DestinationSyncMode.OVERWRITE; + final boolean existingTableCaseSensitiveExists = findExistingTable_caseSensitive(caseSensitiveStreamId).isPresent(); + final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent(); + LOGGER.info( + "Checking whether upcasing migration is necessary for {}.{}. Sync mode requires migration: {}; existing case-sensitive table exists: {}; existing uppercased table does not exist: {}", + streamConfig.id().originalNamespace(), + streamConfig.id().originalName(), + syncModeRequiresMigration, + existingTableCaseSensitiveExists, + existingTableUppercaseDoesNotExist); + if (syncModeRequiresMigration && existingTableCaseSensitiveExists && existingTableUppercaseDoesNotExist) { + LOGGER.info( + "Executing upcasing migration for {}.{}", + streamConfig.id().originalNamespace(), + streamConfig.id().originalName()); + handler.execute(generator.softReset(streamConfig)); + } + } + + // These methods were copied from + // https://github.com/airbytehq/airbyte/blob/d5fdb1b982d464f54941bf9a830b9684fb47d249/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java + // which is the highest version of destination-snowflake that still uses quoted+case-sensitive + // identifiers + private static StreamId buildStreamId_caseSensitive(final String namespace, final String name, final String rawNamespaceOverride) { + // No escaping needed, as far as I can tell. We quote all our identifier names. + return new StreamId( + escapeIdentifier_caseSensitive(namespace), + escapeIdentifier_caseSensitive(name), + escapeIdentifier_caseSensitive(rawNamespaceOverride), + escapeIdentifier_caseSensitive(StreamId.concatenateRawTableName(namespace, name)), + namespace, + name); + } + + private static String escapeIdentifier_caseSensitive(final String identifier) { + // Note that we don't need to escape backslashes here! + // The only special character in an identifier is the double-quote, which needs to be doubled. + return identifier.replace("\"", "\"\""); + } + + // And this was taken from + // https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java + public Optional findExistingTable_caseSensitive(final StreamId id) throws SQLException { + // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates + // VARIANT as VARCHAR + final LinkedHashMap columns = database.queryJsons( + """ + SELECT column_name, data_type + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema = ? + AND table_name = ? + ORDER BY ordinal_position; + """, + databaseName.toUpperCase(), + id.finalNamespace(), + id.finalName()).stream() + .collect(LinkedHashMap::new, + (map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()), + LinkedHashMap::putAll); + // TODO query for indexes/partitioning/etc + + if (columns.isEmpty()) { + return Optional.empty(); + } else { + return Optional.of(new SnowflakeTableDefinition(columns)); + } + } + +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index 0a0042b6cbebf..5fab649a06b94 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -18,9 +18,16 @@ import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase; import io.airbyte.integrations.destination.snowflake.SnowflakeTestUtils; +import io.airbyte.protocol.models.v0.AirbyteMessage; +import io.airbyte.protocol.models.v0.AirbyteStream; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.v0.DestinationSyncMode; +import io.airbyte.protocol.models.v0.SyncMode; import java.nio.file.Path; import java.util.List; import javax.sql.DataSource; +import org.junit.jupiter.api.Test; public abstract class AbstractSnowflakeTypingDedupingTest extends BaseTypingDedupingTest { @@ -100,6 +107,76 @@ protected String getRawSchema() { return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; } + /** + * Run a sync using 3.0.0 (which is the highest version that still creates v2 final tables with + * lowercased+quoted names). Then run a sync using our current version. + */ + @Test + public void testFinalTableUppercasingMigration_append() throws Exception { + try { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog, messages1, "airbyte/destination-snowflake:3.0.0"); + // We no longer have the code to dump a lowercased table, so just move on directly to the new sync + + // Second sync + final List messages2 = readMessages("dat/sync2_messages.jsonl"); + + runSync(catalog, messages2); + + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2); + } finally { + // manually drop the lowercased schema, since we no longer have the code to do it automatically + // (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code + // handles it fine) + database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE"); + } + } + + @Test + public void testFinalTableUppercasingMigration_overwrite() throws Exception { + try { + final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.FULL_REFRESH) + .withDestinationSyncMode(DestinationSyncMode.OVERWRITE) + .withStream(new AirbyteStream() + .withNamespace(streamNamespace) + .withName(streamName) + .withJsonSchema(SCHEMA)))); + + // First sync + final List messages1 = readMessages("dat/sync1_messages.jsonl"); + runSync(catalog, messages1, "airbyte/destination-snowflake:3.0.0"); + // We no longer have the code to dump a lowercased table, so just move on directly to the new sync + + // Second sync + final List messages2 = readMessages("dat/sync2_messages.jsonl"); + + runSync(catalog, messages2); + + final List expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl"); + final List expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl"); + verifySyncResult(expectedRawRecords2, expectedFinalRecords2); + } finally { + // manually drop the lowercased schema, since we no longer have the code to do it automatically + // (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code + // handles it fine) + database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE"); + } + } + private String getDefaultSchema() { return getConfig().get("schema").asText(); } From 6d71b7dc3e1dbd6ab5c22c84a77c84f53ab51c4a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 1 Sep 2023 13:37:12 -0700 Subject: [PATCH 15/19] logistics --- .../connectors/destination-snowflake/Dockerfile | 2 +- .../connectors/destination-snowflake/metadata.yaml | 2 +- docs/integrations/destinations/snowflake.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index b92bba992ea19..98123c9f84941 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=3.0.1 +LABEL io.airbyte.version=3.1.0 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index e24cca10c6932..9af9c726f3152 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.0.1 + dockerImageTag: 3.1.0 dockerRepository: airbyte/destination-snowflake githubIssueLabel: destination-snowflake icon: snowflake.svg diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 81c197de18d8f..191d64243b322 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -271,6 +271,7 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | | :-------------- | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 3.1.0 | 2023-09-01 | [\#30056](https://github.com/airbytehq/airbyte/pull/30056) | Upcase final table names to allow case-insensitive references | | 3.0.1 | 2023-08-27 | [\#30065](https://github.com/airbytehq/airbyte/pull/30065) | Clearer error thrown when records are missing a primary key | | 3.0.0 | 2023-08-27 | [\#29783](https://github.com/airbytehq/airbyte/pull/29783) | Destinations V2 | | 2.1.7 | 2023-08-29 | [\#29949](https://github.com/airbytehq/airbyte/pull/29949) | Destinations V2: Fix checking for empty table by ensuring upper-case DB names | From b31863d8aa562518063d0e3a4fca3289334c1c7a Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 1 Sep 2023 13:38:59 -0700 Subject: [PATCH 16/19] delete case-sensitivity from upgrade guide --- docs/release_notes/upgrading_to_destinations_v2.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/release_notes/upgrading_to_destinations_v2.md b/docs/release_notes/upgrading_to_destinations_v2.md index c80b6eaffa4f9..b9b0f9e1e6ca5 100644 --- a/docs/release_notes/upgrading_to_destinations_v2.md +++ b/docs/release_notes/upgrading_to_destinations_v2.md @@ -175,12 +175,6 @@ In addition to the changes which apply for all destinations described above, the 1. [Object and array properties](https://docs.airbyte.com/understanding-airbyte/supported-data-types/#the-types) are properly stored as JSON columns. Previously, we had used TEXT, which made querying sub-properties more difficult. - In certain cases, numbers within sub-properties with long decimal values will need to be converted to float representations due to a _quirk_ of Bigquery. Learn more [here](https://github.com/airbytehq/airbyte/issues/29594). -### Snowflake - -1. `destination-snowflake` is now case-sensitive, and was not previously. This means that if you have a source stream "users", `destination-snowflake` would have previously created a "USERS" table in your data warehouse. We now correctly create a "users" table. - - Note that to properly query case-sensitive tables and columns in Snowflake, you will need to quote your table and column names, e.g. `select "first_name" from "users";` - - If you are migrating from Destinations v1 to Destinations V2, we will leave your old "USERS" table, and create a new "users" table - please note the case sensitivity. - ## Updating Downstream Transformations _This section is targeted towards analysts updating downstream models after you've successfully upgraded to Destinations V2._ From 06277ddff00b28d3988002cb83fd921281e5e69d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Fri, 1 Sep 2023 14:49:32 -0700 Subject: [PATCH 17/19] also bigquery logistics --- airbyte-integrations/connectors/destination-bigquery/Dockerfile | 2 +- .../connectors/destination-bigquery/metadata.yaml | 2 +- docs/integrations/destinations/bigquery.md | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index c45781c4427cc..dfeb399a38954 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.1 +LABEL io.airbyte.version=2.0.2 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 6acf7af1af667..73d18f76c92d2 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.1 + dockerImageTag: 2.0.2 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg diff --git a/docs/integrations/destinations/bigquery.md b/docs/integrations/destinations/bigquery.md index 14a2b1895b9b4..4218f273bfc96 100644 --- a/docs/integrations/destinations/bigquery.md +++ b/docs/integrations/destinations/bigquery.md @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo | Version | Date | Pull Request | Subject | | :------ | :--------- | :--------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| 2.0.2 | 2023-09-01 | [30056](https://github.com/airbytehq/airbyte/pull/30056) | Internal refactor, no behavior change | | 2.0.1 | 2023-08-29 | [29972](https://github.com/airbytehq/airbyte/pull/29972) | Publish a new version to supersede old v2.0.0 | | 2.0.0 | 2023-08-27 | [29783](https://github.com/airbytehq/airbyte/pull/29783) | Destinations V2 | | 1.10.2 | 2023-08-24 | [\#29805](https://github.com/airbytehq/airbyte/pull/29805) | Destinations v2: Don't soft reset in migration | From 26e654f6416eb2b956ceb2ae258df810bbd1649d Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Tue, 5 Sep 2023 07:34:38 -0700 Subject: [PATCH 18/19] merge pls --- airbyte-integrations/connectors/destination-bigquery/Dockerfile | 2 +- .../connectors/destination-bigquery/metadata.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index dfeb399a38954..e4d1a36caa8da 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=2.0.2 +LABEL io.airbyte.version=2.0.3 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml index 73d18f76c92d2..145a4d09a14fd 100644 --- a/airbyte-integrations/connectors/destination-bigquery/metadata.yaml +++ b/airbyte-integrations/connectors/destination-bigquery/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 - dockerImageTag: 2.0.2 + dockerImageTag: 2.0.3 dockerRepository: airbyte/destination-bigquery githubIssueLabel: destination-bigquery icon: bigquery.svg From c59f4eb22f744b03d20b3c8dfb3e8ff3a67f0fd8 Mon Sep 17 00:00:00 2001 From: edgao Date: Tue, 5 Sep 2023 14:50:09 +0000 Subject: [PATCH 19/19] Automated Commit - Formatting Changes --- .../source_hubplanner/schemas/projects.json | 6 +++--- .../source_hubplanner/schemas/resources.json | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/projects.json b/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/projects.json index 5b40142bed5eb..24e456bf861af 100644 --- a/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/projects.json +++ b/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/projects.json @@ -166,19 +166,19 @@ "categoryGroups": { "type": ["null", "array"], "items": { - "type": ["null","string"] + "type": ["null", "string"] } }, "customers": { "type": ["null", "array"], "items": { - "type": ["null","string"] + "type": ["null", "string"] } }, "budgetCategories": { "type": ["null", "array"], "items": { - "type": ["null","object"] + "type": ["null", "object"] } }, "private": { diff --git a/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/resources.json b/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/resources.json index 004fd9af9261b..fc646a589cb02 100644 --- a/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/resources.json +++ b/airbyte-integrations/connectors/source-hubplanner/source_hubplanner/schemas/resources.json @@ -45,7 +45,7 @@ "calendarIds": { "type": ["null", "array"], "items": { - "type": ["null","string"] + "type": ["null", "string"] } }, "isApprover": {