Skip to content

Commit 9231c04

Browse files
authored
🐛 Destination snowflake: Create final tables with uppercase naming (#30056)
Co-authored-by: edgao <[email protected]>
1 parent dee066b commit 9231c04

File tree

36 files changed

+470
-203
lines changed

36 files changed

+470
-203
lines changed

airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseSqlGeneratorIntegrationTest.java

+18-13
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,7 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
8484
Stream.of("_ab_cdc_deleted_at")).toList();
8585
}
8686

87-
protected static final RecordDiffer DIFFER = new RecordDiffer(
88-
Pair.of("id1", AirbyteProtocolType.INTEGER),
89-
Pair.of("id2", AirbyteProtocolType.INTEGER),
90-
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
87+
protected RecordDiffer DIFFER;
9188

9289
/**
9390
* Subclasses may use these four StreamConfigs in their tests.
@@ -118,6 +115,14 @@ public abstract class BaseSqlGeneratorIntegrationTest<DialectTableDefinition> {
118115

119116
protected abstract DestinationHandler<DialectTableDefinition> getDestinationHandler();
120117

118+
/**
119+
* Subclasses should override this method if they need to make changes to the stream ID. For
120+
* example, you could upcase the final table name here.
121+
*/
122+
protected StreamId buildStreamId(final String namespace, final String finalTableName, final String rawTableName) {
123+
return new StreamId(namespace, finalTableName, namespace, rawTableName, namespace, finalTableName);
124+
}
125+
121126
/**
122127
* Do any setup work to create a namespace for this test run. For example, this might create a
123128
* BigQuery dataset, or a Snowflake schema.
@@ -198,12 +203,17 @@ public void setup() throws Exception {
198203
final LinkedHashMap<ColumnId, AirbyteType> cdcColumns = new LinkedHashMap<>(COLUMNS);
199204
cdcColumns.put(generator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);
200205

206+
DIFFER = new RecordDiffer(
207+
Pair.of(id1, AirbyteProtocolType.INTEGER),
208+
Pair.of(id2, AirbyteProtocolType.INTEGER),
209+
Pair.of(cursor, AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE));
210+
201211
namespace = Strings.addRandomSuffix("sql_generator_test", "_", 5);
202212
// This is not a typical stream ID would look like, but SqlGenerator isn't allowed to make any
203213
// assumptions about StreamId structure.
204214
// In practice, the final table would be testDataset.users, and the raw table would be
205215
// airbyte_internal.testDataset_raw__stream_users.
206-
streamId = new StreamId(namespace, "users_final", namespace, "users_raw", namespace, "users_final");
216+
streamId = buildStreamId(namespace, "users_final", "users_raw");
207217

208218
incrementalDedupStream = new StreamConfig(
209219
streamId,
@@ -483,7 +493,7 @@ public void incrementalDedupNoCursor() throws Exception {
483493
actualFinalRecords);
484494
assertAll(
485495
() -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()),
486-
() -> assertEquals("bar", actualFinalRecords.get(0).get("string").asText()));
496+
() -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()));
487497
}
488498

489499
@Test
@@ -791,15 +801,10 @@ public void weirdColumnNames() throws Exception {
791801
public void noCrashOnSpecialCharacters(final String specialChars) throws Exception {
792802
final String str = namespace + "_" + specialChars;
793803
final StreamId originalStreamId = generator.buildStreamId(str, str, "unused");
794-
final StreamId modifiedStreamId = new StreamId(
804+
final StreamId modifiedStreamId = buildStreamId(
795805
originalStreamId.finalNamespace(),
796806
originalStreamId.finalName(),
797-
// hack for testing simplicity: put the raw tables in the final namespace. This makes cleanup
798-
// easier.
799-
originalStreamId.finalNamespace(),
800-
"raw_table",
801-
null,
802-
null);
807+
"raw_table");
803808
final ColumnId columnId = generator.buildColumnId(str);
804809
try {
805810
createNamespace(modifiedStreamId.finalNamespace());

airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ public abstract class BaseTypingDedupingTest {
6868
throw new RuntimeException(e);
6969
}
7070
}
71-
private static final RecordDiffer DIFFER = new RecordDiffer(
72-
Pair.of("id1", AirbyteProtocolType.INTEGER),
73-
Pair.of("id2", AirbyteProtocolType.INTEGER),
74-
Pair.of("updated_at", AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
75-
Pair.of("old_cursor", AirbyteProtocolType.INTEGER));
71+
private RecordDiffer DIFFER;
7672

7773
private String randomSuffix;
7874
private JsonNode config;
@@ -112,7 +108,8 @@ public abstract class BaseTypingDedupingTest {
112108
/**
113109
* For a given stream, return the records that exist in the destination's final table. Each record
114110
* must be in the format {"_airbyte_raw_id": "...", "_airbyte_extracted_at": "...", "_airbyte_meta":
115-
* {...}, "field1": ..., "field2": ..., ...}.
111+
* {...}, "field1": ..., "field2": ..., ...}. If the destination renames (e.g. upcases) the airbyte
112+
* fields, this method must revert that naming to use the exact strings "_airbyte_raw_id", etc.
116113
* <p>
117114
* For JSON-valued columns, there is some nuance: a SQL null should be represented as a missing
118115
* entry, whereas a JSON null should be represented as a
@@ -137,6 +134,8 @@ public abstract class BaseTypingDedupingTest {
137134
*/
138135
protected abstract void teardownStreamAndNamespace(String streamNamespace, String streamName) throws Exception;
139136

137+
protected abstract SqlGenerator<?> getSqlGenerator();
138+
140139
/**
141140
* Destinations which need to clean up resources after an entire test finishes should override this
142141
* method. For example, if you want to gracefully close a database connection, you should do that
@@ -164,6 +163,14 @@ public void setup() throws Exception {
164163
streamNamespace = "typing_deduping_test" + getUniqueSuffix();
165164
streamName = "test_stream" + getUniqueSuffix();
166165
streamsToTearDown = new ArrayList<>();
166+
167+
final SqlGenerator<?> generator = getSqlGenerator();
168+
DIFFER = new RecordDiffer(
169+
Pair.of(generator.buildColumnId("id1"), AirbyteProtocolType.INTEGER),
170+
Pair.of(generator.buildColumnId("id2"), AirbyteProtocolType.INTEGER),
171+
Pair.of(generator.buildColumnId("updated_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE),
172+
Pair.of(generator.buildColumnId("old_cursor"), AirbyteProtocolType.INTEGER));
173+
167174
LOGGER.info("Using stream namespace {} and name {}", streamNamespace, streamName);
168175
}
169176

@@ -409,7 +416,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
409416
// The raw data is unaffected by the schema, but the final table should not have a `name` column.
410417
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
411418
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
412-
.peek(record -> ((ObjectNode) record).remove("name"))
419+
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
413420
.toList();
414421
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
415422
}

airbyte-integrations/bases/base-typing-deduping-test/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.java

+33-13
Original file line numberDiff line numberDiff line change
@@ -37,19 +37,39 @@
3737
*/
3838
public class RecordDiffer {
3939

40-
private final Comparator<JsonNode> recordIdentityComparator;
41-
private final Comparator<JsonNode> recordSortComparator;
42-
private final Function<JsonNode, String> recordIdentityExtractor;
40+
private final Comparator<JsonNode> rawRecordIdentityComparator;
41+
private final Comparator<JsonNode> rawRecordSortComparator;
42+
private final Function<JsonNode, String> rawRecordIdentityExtractor;
43+
44+
private final Comparator<JsonNode> finalRecordIdentityComparator;
45+
private final Comparator<JsonNode> finalRecordSortComparator;
46+
private final Function<JsonNode, String> finalRecordIdentityExtractor;
4347

4448
/**
4549
* @param identifyingColumns Which fields constitute a unique record (typically PK+cursor). Do _not_
4650
* include extracted_at; it is handled automatically.
4751
*/
4852
@SafeVarargs
49-
public RecordDiffer(final Pair<String, AirbyteType>... identifyingColumns) {
50-
this.recordIdentityComparator = buildIdentityComparator(identifyingColumns);
51-
this.recordSortComparator = recordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
52-
this.recordIdentityExtractor = buildIdentityExtractor(identifyingColumns);
53+
public RecordDiffer(final Pair<ColumnId, AirbyteType>... identifyingColumns) {
54+
final Pair<String, AirbyteType>[] rawTableIdentifyingColumns = Arrays.stream(identifyingColumns)
55+
.map(p -> Pair.of(
56+
// Raw tables always retain the original column names
57+
p.getLeft().originalName(),
58+
p.getRight()))
59+
.toArray(Pair[]::new);
60+
this.rawRecordIdentityComparator = buildIdentityComparator(rawTableIdentifyingColumns);
61+
this.rawRecordSortComparator = rawRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
62+
this.rawRecordIdentityExtractor = buildIdentityExtractor(rawTableIdentifyingColumns);
63+
64+
final Pair<String, AirbyteType>[] finalTableIdentifyingColumns = Arrays.stream(identifyingColumns)
65+
.map(p -> Pair.of(
66+
// Final tables may have modified the column names, so use the final name here.
67+
p.getLeft().name(),
68+
p.getRight()))
69+
.toArray(Pair[]::new);
70+
this.finalRecordIdentityComparator = buildIdentityComparator(finalTableIdentifyingColumns);
71+
this.finalRecordSortComparator = finalRecordIdentityComparator.thenComparing(record -> asString(record.get("_airbyte_raw_id")));
72+
this.finalRecordIdentityExtractor = buildIdentityExtractor(finalTableIdentifyingColumns);
5373
}
5474

5575
/**
@@ -70,9 +90,9 @@ public void diffRawTableRecords(final List<JsonNode> expectedRecords, final List
7090
final String diff = diffRecords(
7191
expectedRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
7292
actualRecords.stream().map(RecordDiffer::copyWithLiftedData).collect(toList()),
73-
recordIdentityComparator,
74-
recordSortComparator,
75-
recordIdentityExtractor);
93+
rawRecordIdentityComparator,
94+
rawRecordSortComparator,
95+
rawRecordIdentityExtractor);
7696

7797
if (!diff.isEmpty()) {
7898
fail("Raw table was incorrect.\n" + diff);
@@ -83,9 +103,9 @@ public void diffFinalTableRecords(final List<JsonNode> expectedRecords, final Li
83103
final String diff = diffRecords(
84104
expectedRecords,
85105
actualRecords,
86-
recordIdentityComparator,
87-
recordSortComparator,
88-
recordIdentityExtractor);
106+
finalRecordIdentityComparator,
107+
finalRecordSortComparator,
108+
finalRecordIdentityExtractor);
89109

90110
if (!diff.isEmpty()) {
91111
fail("Final table was incorrect.\n" + diff);

airbyte-integrations/bases/base-typing-deduping/src/main/java/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
3737
private final DestinationHandler<DialectTableDefinition> destinationHandler;
3838

3939
private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
40-
private final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator;
40+
private final V2TableMigrator<DialectTableDefinition> v2TableMigrator;
4141
private final ParsedCatalog parsedCatalog;
4242
private Set<StreamId> overwriteStreamsWithTmpTable;
4343
private final Set<StreamId> streamsWithSuccesfulSetup;
@@ -46,12 +46,12 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
4646
final DestinationHandler<DialectTableDefinition> destinationHandler,
4747
final ParsedCatalog parsedCatalog,
4848
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
49-
final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator) {
49+
final V2TableMigrator<DialectTableDefinition> v2TableMigrator) {
5050
this.sqlGenerator = sqlGenerator;
5151
this.destinationHandler = destinationHandler;
5252
this.parsedCatalog = parsedCatalog;
5353
this.v1V2Migrator = v1V2Migrator;
54-
this.v2RawTableMigrator = v2RawTableMigrator;
54+
this.v2TableMigrator = v2TableMigrator;
5555
this.streamsWithSuccesfulSetup = new HashSet<>();
5656
}
5757

@@ -60,7 +60,7 @@ public DefaultTyperDeduper(
6060
final DestinationHandler<DialectTableDefinition> destinationHandler,
6161
final ParsedCatalog parsedCatalog,
6262
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
63-
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>());
63+
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>());
6464
}
6565

6666
/**
@@ -82,7 +82,7 @@ public void prepareTables() throws Exception {
8282
for (final StreamConfig stream : parsedCatalog.streams()) {
8383
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
8484
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
85-
v2RawTableMigrator.migrateIfNecessary(stream);
85+
v2TableMigrator.migrateIfNecessary(stream);
8686

8787
final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
8888
if (existingTable.isPresent()) {
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.integrations.base.destination.typing_deduping;
66

7-
public class NoopV2RawTableMigrator<DialectTableDefinition> implements V2RawTableMigrator<DialectTableDefinition> {
7+
public class NoopV2TableMigrator<DialectTableDefinition> implements V2TableMigrator<DialectTableDefinition> {
88

99
@Override
1010
public void migrateIfNecessary(final StreamConfig streamConfig) {
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
package io.airbyte.integrations.base.destination.typing_deduping;
66

7-
public interface V2RawTableMigrator<DialectTableDefinition> {
7+
public interface V2TableMigrator<DialectTableDefinition> {
88

9-
void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException;
9+
void migrateIfNecessary(final StreamConfig streamConfig) throws Exception;
1010

1111
}

airbyte-integrations/connectors/destination-bigquery/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery
2525

2626
COPY --from=build /airbyte /airbyte
2727

28-
LABEL io.airbyte.version=2.0.2
28+
LABEL io.airbyte.version=2.0.3
2929
LABEL io.airbyte.name=airbyte/destination-bigquery

airbyte-integrations/connectors/destination-bigquery/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
5-
dockerImageTag: 2.0.2
5+
dockerImageTag: 2.0.3
66
dockerRepository: airbyte/destination-bigquery
77
githubIssueLabel: destination-bigquery
88
icon: bigquery.svg

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
3737
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
3838
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
39-
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2RawTableMigrator;
39+
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
4040
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
4141
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
4242
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
@@ -242,7 +242,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
242242
final TyperDeduper typerDeduper;
243243
parsedCatalog = catalogParser.parseCatalog(catalog);
244244
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
245-
final BigQueryV2RawTableMigrator v2RawTableMigrator = new BigQueryV2RawTableMigrator(bigquery);
245+
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
246246
typerDeduper = new DefaultTyperDeduper<>(
247247
sqlGenerator,
248248
new BigQueryDestinationHandler(bigquery, datasetLocation),
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@
1515
import com.google.cloud.bigquery.TableId;
1616
import io.airbyte.integrations.base.JavaBaseConstants;
1717
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
18-
import io.airbyte.integrations.base.destination.typing_deduping.V2RawTableMigrator;
18+
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
1919
import java.util.Map;
2020
import org.apache.commons.text.StringSubstitutor;
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

24-
public class BigQueryV2RawTableMigrator implements V2RawTableMigrator<TableDefinition> {
24+
public class BigQueryV2TableMigrator implements V2TableMigrator<TableDefinition> {
2525

26-
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2RawTableMigrator.class);
26+
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class);
2727

2828
private final BigQuery bq;
2929

30-
public BigQueryV2RawTableMigrator(final BigQuery bq) {
30+
public BigQueryV2TableMigrator(final BigQuery bq) {
3131
this.bq = bq;
3232
}
3333

airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/typing_deduping/AbstractBigQueryTypingDedupingTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import com.google.cloud.bigquery.TableResult;
1414
import io.airbyte.integrations.base.JavaBaseConstants;
1515
import io.airbyte.integrations.base.destination.typing_deduping.BaseTypingDedupingTest;
16+
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
1617
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
1718
import io.airbyte.integrations.destination.bigquery.BigQueryDestination;
1819
import io.airbyte.integrations.destination.bigquery.BigQueryDestinationTestUtils;
@@ -79,6 +80,11 @@ protected void teardownStreamAndNamespace(String streamNamespace, final String s
7980
bq.delete(DatasetId.of(streamNamespace), BigQuery.DatasetDeleteOption.deleteContents());
8081
}
8182

83+
@Override
84+
protected SqlGenerator<?> getSqlGenerator() {
85+
return new BigQuerySqlGenerator(null);
86+
}
87+
8288
/**
8389
* Run a sync using 1.9.0 (which is the highest version that still creates v2 raw tables with JSON
8490
* _airbyte_data). Then run a sync using our current version.

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1
2929

3030
ENV ENABLE_SENTRY true
3131

32-
LABEL io.airbyte.version=3.0.2
32+
LABEL io.airbyte.version=3.1.0
3333
LABEL io.airbyte.name=airbyte/destination-snowflake

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
5-
dockerImageTag: 3.0.2
5+
dockerImageTag: 3.1.0
66
dockerRepository: airbyte/destination-snowflake
77
githubIssueLabel: destination-snowflake
88
icon: snowflake.svg

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
2323
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
2424
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
25+
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
2526
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
2627
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
2728
import io.airbyte.protocol.models.v0.AirbyteMessage;
@@ -39,7 +40,7 @@
3940
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {
4041

4142
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
42-
private static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
43+
public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
4344
private final String airbyteEnvironment;
4445

4546
public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
@@ -143,7 +144,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
143144
}
144145
parsedCatalog = catalogParser.parseCatalog(catalog);
145146
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
146-
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator);
147+
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
148+
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
147149

148150
return new StagingConsumerFactory().createAsync(
149151
outputRecordCollector,

0 commit comments

Comments
 (0)