Skip to content

Destination snowflake: table casing migration #30068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final DestinationHandler<DialectTableDefinition> destinationHandler;

private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
private final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator;
private final V2TableMigrator<DialectTableDefinition> v2TableMigrator;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<StreamId> streamsWithSuccesfulSetup;
Expand All @@ -46,12 +46,12 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
final DestinationHandler<DialectTableDefinition> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
final V2RawTableMigrator<DialectTableDefinition> v2RawTableMigrator) {
final V2TableMigrator<DialectTableDefinition> v2TableMigrator) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2RawTableMigrator = v2RawTableMigrator;
this.v2TableMigrator = v2TableMigrator;
this.streamsWithSuccesfulSetup = new HashSet<>();
}

Expand All @@ -60,7 +60,7 @@ public DefaultTyperDeduper(
final DestinationHandler<DialectTableDefinition> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator) {
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2RawTableMigrator<>());
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>());
}

/**
Expand All @@ -82,7 +82,7 @@ public void prepareTables() throws Exception {
for (final StreamConfig stream : parsedCatalog.streams()) {
// Migrate the Raw Tables if this is the first v2 sync after a v1 sync
v1V2Migrator.migrateIfNecessary(sqlGenerator, destinationHandler, stream);
v2RawTableMigrator.migrateIfNecessary(stream);
v2TableMigrator.migrateIfNecessary(stream);

final Optional<DialectTableDefinition> existingTable = destinationHandler.findExistingTable(stream.id());
if (existingTable.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.integrations.base.destination.typing_deduping;

public class NoopV2RawTableMigrator<DialectTableDefinition> implements V2RawTableMigrator<DialectTableDefinition> {
public class NoopV2TableMigrator<DialectTableDefinition> implements V2TableMigrator<DialectTableDefinition> {

@Override
public void migrateIfNecessary(final StreamConfig streamConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

public interface V2RawTableMigrator<DialectTableDefinition> {
public interface V2TableMigrator<DialectTableDefinition> {

void migrateIfNecessary(final StreamConfig streamConfig) throws InterruptedException;
void migrateIfNecessary(final StreamConfig streamConfig) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryDestinationHandler;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQuerySqlGenerator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV1V2Migrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2RawTableMigrator;
import io.airbyte.integrations.destination.bigquery.typing_deduping.BigQueryV2TableMigrator;
import io.airbyte.integrations.destination.bigquery.uploader.AbstractBigQueryUploader;
import io.airbyte.integrations.destination.bigquery.uploader.BigQueryUploaderFactory;
import io.airbyte.integrations.destination.bigquery.uploader.UploaderType;
Expand Down Expand Up @@ -242,7 +242,7 @@ public AirbyteMessageConsumer getConsumer(final JsonNode config,
final TyperDeduper typerDeduper;
parsedCatalog = catalogParser.parseCatalog(catalog);
final BigQueryV1V2Migrator migrator = new BigQueryV1V2Migrator(bigquery, namingResolver);
final BigQueryV2RawTableMigrator v2RawTableMigrator = new BigQueryV2RawTableMigrator(bigquery);
final BigQueryV2TableMigrator v2RawTableMigrator = new BigQueryV2TableMigrator(bigquery);
typerDeduper = new DefaultTyperDeduper<>(
sqlGenerator,
new BigQueryDestinationHandler(bigquery, datasetLocation),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@
import com.google.cloud.bigquery.TableId;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.V2RawTableMigrator;
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
import java.util.Map;
import org.apache.commons.text.StringSubstitutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryV2RawTableMigrator implements V2RawTableMigrator<TableDefinition> {
public class BigQueryV2TableMigrator implements V2TableMigrator<TableDefinition> {

private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2RawTableMigrator.class);
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryV2TableMigrator.class);

private final BigQuery bq;

public BigQueryV2RawTableMigrator(final BigQuery bq) {
public BigQueryV2TableMigrator(final BigQuery bq) {
this.bq = bq;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteMessage;
Expand All @@ -39,7 +40,7 @@
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
private static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
public static final String RAW_SCHEMA_OVERRIDE = "raw_data_schema";
private final String airbyteEnvironment;

public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
Expand Down Expand Up @@ -143,7 +144,8 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
}
parsedCatalog = catalogParser.parseCatalog(catalog);
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator);
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
typerDeduper = new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);

return new StagingConsumerFactory().createAsync(
outputRecordCollector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake.typing_deduping;

import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
import static io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingDestination.RAW_SCHEMA_OVERRIDE;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.TypingAndDedupingFlag;
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SnowflakeV2TableMigrator implements V2TableMigrator<SnowflakeTableDefinition> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeV2TableMigrator.class);

private final JdbcDatabase database;
private final String rawNamespace;
private final String databaseName;
private final SnowflakeSqlGenerator generator;
private final SnowflakeDestinationHandler handler;

public SnowflakeV2TableMigrator(final JdbcDatabase database,
final String databaseName,
final SnowflakeSqlGenerator generator,
final SnowflakeDestinationHandler handler) {
this.database = database;
this.databaseName = databaseName;
this.generator = generator;
this.handler = handler;
this.rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
}

@Override
public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception {
final StreamId lowercasedStreamId = buildStreamId_lowercase(
streamConfig.id().originalNamespace(),
streamConfig.id().originalName(),
rawNamespace);
final boolean existingTableLowercaseExists = findExistingTable_lowercase(lowercasedStreamId).isPresent();
final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent();
LOGGER.info(
"Checking whether upcasing migration is necessary for {}.{}. Existing lowercased table exists: {}; existing uppercased table does not exist: {}",
streamConfig.id().originalNamespace(),
streamConfig.id().originalName(),
existingTableLowercaseExists,
existingTableUppercaseDoesNotExist);
if (existingTableLowercaseExists && existingTableUppercaseDoesNotExist) {
LOGGER.info(
"Executing upcasing migration for {}.{}",
streamConfig.id().originalNamespace(),
streamConfig.id().originalName());
handler.execute(generator.softReset(streamConfig));
}
}

// These methods were copied from
// https://github.com/airbytehq/airbyte/blob/d5fdb1b982d464f54941bf9a830b9684fb47d249/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java
// which is the highest version of destination-snowflake that still uses quoted+lowercased
// identifiers
private static StreamId buildStreamId_lowercase(final String namespace, final String name, final String rawNamespaceOverride) {
// No escaping needed, as far as I can tell. We quote all our identifier names.
return new StreamId(
escapeIdentifier_lowercase(namespace),
escapeIdentifier_lowercase(name),
escapeIdentifier_lowercase(rawNamespaceOverride),
escapeIdentifier_lowercase(StreamId.concatenateRawTableName(namespace, name)),
namespace,
name);
}

private static String escapeIdentifier_lowercase(final String identifier) {
// Note that we don't need to escape backslashes here!
// The only special character in an identifier is the double-quote, which needs to be doubled.
return identifier.replace("\"", "\"\"");
}

// And this was taken from
// https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java
public Optional<SnowflakeTableDefinition> findExistingTable_lowercase(final StreamId id) throws SQLException {
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates
// VARIANT as VARCHAR
final LinkedHashMap<String, String> columns = database.queryJsons(
"""
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_catalog = ?
AND table_schema = ?
AND table_name = ?
ORDER BY ordinal_position;
""",
databaseName.toUpperCase(),
id.finalNamespace(),
id.finalName()).stream()
.collect(LinkedHashMap::new,
(map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()),
LinkedHashMap::putAll);
// TODO query for indexes/partitioning/etc

if (columns.isEmpty()) {
return Optional.empty();
} else {
return Optional.of(new SnowflakeTableDefinition(columns));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts;
import io.airbyte.integrations.destination.snowflake.SnowflakeDatabase;
import io.airbyte.integrations.destination.snowflake.SnowflakeTestUtils;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStream;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.nio.file.Path;
import java.util.List;
import javax.sql.DataSource;
import org.junit.jupiter.api.Test;

public abstract class AbstractSnowflakeTypingDedupingTest extends BaseTypingDedupingTest {

Expand Down Expand Up @@ -100,6 +107,43 @@ protected String getRawSchema() {
return JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
}

/**
* Run a sync using 3.0.0 (which is the highest version that still creates v2 final tables with
* lowercased+quoted names). Then run a sync using our current version.
*/
@Test
public void testFinalTableUppercasingMigration() throws Exception {
try {
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(new AirbyteStream()
.withNamespace(streamNamespace)
.withName(streamName)
.withJsonSchema(SCHEMA))));

// First sync
final List<AirbyteMessage> messages1 = readMessages("dat/sync1_messages.jsonl");
runSync(catalog, messages1, "airbyte/destination-snowflake:3.0.0");
// We no longer have the code to dump a lowercased table, so just move on directly to the new sync

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
} finally {
// manually drop the lowercased schema, since we no longer have the code to do it automatically
// (the raw table is still in lowercase "airbyte_internal"."whatever", so the auto-cleanup code
// handles it fine)
database.execute("DROP SCHEMA IF EXISTS \"" + streamNamespace + "\" CASCADE");
}
}

private String getDefaultSchema() {
return getConfig().get("schema").asText();
}
Expand Down