Skip to content

Commit abde8fa

Browse files
authored
Destination snowflake: table casing migration (#30068)
Co-authored-by: edgao <[email protected]>
1 parent befa893 commit abde8fa

File tree

8 files changed

+213
-16
lines changed

8 files changed

+213
-16
lines changed

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
3737
private final DestinationHandler<DialectTableDefinition> destinationHandler;
3838

3939
private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
40-
private final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator;
40+
private final V2TableMigrator<DialectTableDefinition> v2TableMigrator;
4141
private final ParsedCatalog parsedCatalog;
4242
private Set<StreamId> overwriteStreamsWithTmpTable;
4343
private final Set<StreamId> streamsWithSuccesfulSetup;
@@ -46,12 +46,12 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
4646
final DestinationHandler<DialectTableDefinition> destinationHandler,
4747
final ParsedCatalog parsedCatalog,
4848
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
49-
final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator) {
49+
final V2TableMigrator<DialectTableDefinition> v2TableMigrator) {
5050
this.sqlGenerator = sqlGenerator;
5151
this.destinationHandler = destinationHandler;
5252
this.parsedCatalog = parsedCatalog;
5353
this.v1V2Migrator = v1V2Migrator;
54-
this.v2RawTableMigrator = v2RawTableMigrator;
54+
this.v2TableMigrator = v2TableMigrator;
5555
this.streamsWithSuccesfulSetup = new HashSet<>();
5656
}
5757

@@ -60,7 +60,7 @@ public DefaultTyperDeduper(
6060
final DestinationHandler<DialectTableDefinition> destinationHandler,
6161
final ParsedCatalog parsedCatalog,
6262
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
63-
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>());
63+
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>());
6464
}
6565

6666
/**
@@ -82,7 +82,7 @@ public void prepareTables() throws Exception {
8282
for (final StreamConfig stream : parsedCatalog.streams()) {
8383
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
8484
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
85-
v2RawTableMigrator.migrateIfNecessary(stream);
85+
v2TableMigrator.migrateIfNecessary(stream);
8686

8787
final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
8888
if (existingTable.isPresent()) {
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.integrations.base.destination.typing_deduping;
66

7-
public class NoopV2RawTableMigrator<DialectTableDefinition> implements V2RawTableMigrator<DialectTableDefinition> {
7+
public class NoopV2TableMigrator<DialectTableDefinition> implements V2TableMigrator<DialectTableDefinition> {
88

99
@Override
1010
public void migrateIfNecessary(final StreamConfig streamConfig) {
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package io.airbyte.integrations.base.destination.typing_deduping;
66

7-
public interface V2RawTableMigrator<DialectTableDefinition> {
7+
public interface V2TableMigrator<DialectTableDefinition> {
88

9-
void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException;
9+
void migrateIfNecessary(final StreamConfig streamConfig) throws Exception;
1010

1111
}

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
3737
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
3838
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
39-
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2RawTableMigrator;
39+
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
4040
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
4141
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
4242
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
@@ -242,7 +242,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
242242
final TyperDeduper typerDeduper;
243243
parsedCatalog = catalogParser.parseCatalog(catalog);
244244
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
245-
final BigQueryV2RawTableMigrator v2RawTableMigrator = new BigQueryV2RawTableMigrator(bigquery);
245+
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
246246
typerDeduper = new DefaultTyperDeduper<>(
247247
sqlGenerator,
248248
new BigQueryDestinationHandler(bigquery, datasetLocation),
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@
1515
import com.google.cloud.bigquery.TableId;
1616
import io.airbyte.integrations.base.JavaBaseConstants;
1717
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
18-
import io.airbyte.integrations.base.destination.typing_deduping.V2RawTableMigrator;
18+
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
1919
import java.util.Map;
2020
import org.apache.commons.text.StringSubstitutor;
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24-
public class BigQueryV2RawTableMigrator implements V2RawTableMigrator<TableDefinition> {
24+
public class BigQueryV2TableMigrator implements V2TableMigrator<TableDefinition> {
2525

26-
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2RawTableMigrator.class);
26+
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class);
2727

2828
private final BigQuery bq;
2929

30-
public BigQueryV2RawTableMigrator(final BigQuery bq) {
30+
public BigQueryV2TableMigrator(final BigQuery bq) {
3131
this.bq = bq;
3232
}
3333

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
2323
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
2424
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
25+
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
2526
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
2627
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
2728
import io.airbyte.protocol.models.v0.AirbyteMessage;
@@ -39,7 +40,7 @@
3940
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {
4041

4142
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
42-
private static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
43+
public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
4344
private final String airbyteEnvironment;
4445

4546
public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
@@ -143,7 +144,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
143144
}
144145
parsedCatalog = catalogParser.parseCatalog(catalog);
145146
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
146-
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator);
147+
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
148+
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
147149

148150
return new StagingConsumerFactory().createAsync(
149151
outputRecordCollector,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.snowflake.typing_deduping;
6+
7+
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
8+
import static io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingDestination.RAW_SCHEMA_OVERRIDE;
9+
10+
import io.airbyte.db.jdbc.JdbcDatabase;
11+
import io.airbyte.integrations.base.TypingAndDedupingFlag;
12+
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
13+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
14+
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
15+
import io.airbyte.protocol.models.v0.DestinationSyncMode;
16+
import java.sql.SQLException;
17+
import java.util.LinkedHashMap;
18+
import java.util.Optional;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
public class SnowflakeV2TableMigrator implements V2TableMigrator<SnowflakeTableDefinition> {
23+
24+
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeV2TableMigrator.class);
25+
26+
private final JdbcDatabase database;
27+
private final String rawNamespace;
28+
private final String databaseName;
29+
private final SnowflakeSqlGenerator generator;
30+
private final SnowflakeDestinationHandler handler;
31+
32+
public SnowflakeV2TableMigrator(final JdbcDatabase database,
33+
final String databaseName,
34+
final SnowflakeSqlGenerator generator,
35+
final SnowflakeDestinationHandler handler) {
36+
this.database = database;
37+
this.databaseName = databaseName;
38+
this.generator = generator;
39+
this.handler = handler;
40+
this.rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
41+
}
42+
43+
@Override
44+
public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception {
45+
final StreamId caseSensitiveStreamId = buildStreamId_caseSensitive(
46+
streamConfig.id().originalNamespace(),
47+
streamConfig.id().originalName(),
48+
rawNamespace);
49+
final boolean syncModeRequiresMigration = streamConfig.destinationSyncMode() != DestinationSyncMode.OVERWRITE;
50+
final boolean existingTableCaseSensitiveExists = findExistingTable_caseSensitive(caseSensitiveStreamId).isPresent();
51+
final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent();
52+
LOGGER.info(
53+
"Checking whether upcasing migration is necessary for {}.{}. Sync mode requires migration: {}; existing case-sensitive table exists: {}; existing uppercased table does not exist: {}",
54+
streamConfig.id().originalNamespace(),
55+
streamConfig.id().originalName(),
56+
syncModeRequiresMigration,
57+
existingTableCaseSensitiveExists,
58+
existingTableUppercaseDoesNotExist);
59+
if (syncModeRequiresMigration && existingTableCaseSensitiveExists && existingTableUppercaseDoesNotExist) {
60+
LOGGER.info(
61+
"Executing upcasing migration for {}.{}",
62+
streamConfig.id().originalNamespace(),
63+
streamConfig.id().originalName());
64+
handler.execute(generator.softReset(streamConfig));
65+
}
66+
}
67+
68+
// These methods were copied from
69+
// https://github.com/airbytehq/airbyte/blob/d5fdb1b982d464f54941bf9a830b9684fb47d249/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java
70+
// which is the highest version of destination-snowflake that still uses quoted+case-sensitive
71+
// identifiers
72+
private static StreamId buildStreamId_caseSensitive(final String namespace, final String name, final String rawNamespaceOverride) {
73+
// No escaping needed, as far as I can tell. We quote all our identifier names.
74+
return new StreamId(
75+
escapeIdentifier_caseSensitive(namespace),
76+
escapeIdentifier_caseSensitive(name),
77+
escapeIdentifier_caseSensitive(rawNamespaceOverride),
78+
escapeIdentifier_caseSensitive(StreamId.concatenateRawTableName(namespace, name)),
79+
namespace,
80+
name);
81+
}
82+
83+
private static String escapeIdentifier_caseSensitive(final String identifier) {
84+
// Note that we don't need to escape backslashes here!
85+
// The only special character in an identifier is the double-quote, which needs to be doubled.
86+
return identifier.replace("\"", "\"\"");
87+
}
88+
89+
// And this was taken from
90+
// https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java
91+
public Optional<SnowflakeTableDefinition> findExistingTable_caseSensitive(final StreamId id) throws SQLException {
92+
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates
93+
// VARIANT as VARCHAR
94+
final LinkedHashMap<String, String> columns = database.queryJsons(
95+
"""
96+
SELECT column_name, data_type
97+
FROM information_schema.columns
98+
WHERE table_catalog = ?
99+
AND table_schema = ?
100+
AND table_name = ?
101+
ORDER BY ordinal_position;
102+
""",
103+
databaseName.toUpperCase(),
104+
id.finalNamespace(),
105+
id.finalName()).stream()
106+
.collect(LinkedHashMap::new,
107+
(map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()),
108+
LinkedHashMap::putAll);
109+
// TODO query for indexes/partitioning/etc
110+
111+
if (columns.isEmpty()) {
112+
return Optional.empty();
113+
} else {
114+
return Optional.of(new SnowflakeTableDefinition(columns));
115+
}
116+
}
117+
118+
}

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java

+77
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,16 @@
1818
import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts;
1919
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase;
2020
import io.airbyte.integrations.destination.snowflake.SnowflakeTestUtils;
21+
import io.airbyte.protocol.models.v0.AirbyteMessage;
22+
import io.airbyte.protocol.models.v0.AirbyteStream;
23+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
24+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
25+
import io.airbyte.protocol.models.v0.DestinationSyncMode;
26+
import io.airbyte.protocol.models.v0.SyncMode;
2127
import java.nio.file.Path;
2228
import java.util.List;
2329
import javax.sql.DataSource;
30+
import org.junit.jupiter.api.Test;
2431

2532
public abstract class AbstractSnowflakeTypingDedupingTest extends BaseTypingDedupingTest {
2633

@@ -100,6 +107,76 @@ protected String getRawSchema() {
100107
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
101108
}
102109

110+
/**
111+
* Run a sync using 3.0.0 (which is the highest version that still creates v2 final tables with
112+
* lowercased+quoted names). Then run a sync using our current version.
113+
*/
114+
@Test
115+
public void testFinalTableUppercasingMigration_append() throws Exception {
116+
try {
117+
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
118+
new ConfiguredAirbyteStream()
119+
.withSyncMode(SyncMode.FULL_REFRESH)
120+
.withDestinationSyncMode(DestinationSyncMode.APPEND)
121+
.withStream(new AirbyteStream()
122+
.withNamespace(streamNamespace)
123+
.withName(streamName)
124+
.withJsonSchema(SCHEMA))));
125+
126+
// First sync
127+
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
128+
runSync(catalog, messages1, "airbyte/destination-snowflake:3.0.0");
129+
// We no longer have the code to dump a lowercased table, so just move on directly to the new sync
130+
131+
// Second sync
132+
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
133+
134+
runSync(catalog, messages2);
135+
136+
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
137+
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
138+
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
139+
} finally {
140+
// manually drop the lowercased schema, since we no longer have the code to do it automatically
141+
// (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code
142+
// handles it fine)
143+
database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE");
144+
}
145+
}
146+
147+
@Test
148+
public void testFinalTableUppercasingMigration_overwrite() throws Exception {
149+
try {
150+
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
151+
new ConfiguredAirbyteStream()
152+
.withSyncMode(SyncMode.FULL_REFRESH)
153+
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
154+
.withStream(new AirbyteStream()
155+
.withNamespace(streamNamespace)
156+
.withName(streamName)
157+
.withJsonSchema(SCHEMA))));
158+
159+
// First sync
160+
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
161+
runSync(catalog, messages1, "airbyte/destination-snowflake:3.0.0");
162+
// We no longer have the code to dump a lowercased table, so just move on directly to the new sync
163+
164+
// Second sync
165+
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
166+
167+
runSync(catalog, messages2);
168+
169+
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl");
170+
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl");
171+
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
172+
} finally {
173+
// manually drop the lowercased schema, since we no longer have the code to do it automatically
174+
// (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code
175+
// handles it fine)
176+
database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE");
177+
}
178+
}
179+
103180
private String getDefaultSchema() {
104181
return getConfig().get("schema").asText();
105182
}

0 commit comments

Comments
 (0)