-
Notifications
You must be signed in to change notification settings - Fork 4.7k
Destination snowflake: table casing migration #30068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
edgao
merged 12 commits into
edgao/dv2/snowflake/case_sensitivity
from
edgao/dv2/snowflake/case_sensitivity_migration
Sep 1, 2023
Merged
Changes from 8 commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
7f71d1c
rename
edgao ca7249d
add migration code
edgao e11e693
unused method
edgao 1a5d600
Automated Commit - Formatting Changes
edgao cc4d394
properly cleanup
edgao 397827b
Automated Commit - Formatting Changes
edgao be1a6e8
improve migration check
edgao 6b179dc
Automated Commit - Formatting Changes
edgao 2f5b24a
skip migration in overwrite mode
edgao 2ac4dba
fix log message
edgao f5db3aa
rename for clarity
edgao 546a590
Automated Commit - Formatting Changes
edgao File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
115 changes: 115 additions & 0 deletions
115
.../airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.snowflake.typing_deduping; | ||
|
||
import static io.airbyte.integrations.base.JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE; | ||
import static io.airbyte.integrations.destination.snowflake.SnowflakeInternalStagingDestination.RAW_SCHEMA_OVERRIDE; | ||
|
||
import io.airbyte.db.jdbc.JdbcDatabase; | ||
import io.airbyte.integrations.base.TypingAndDedupingFlag; | ||
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; | ||
import io.airbyte.integrations.base.destination.typing_deduping.StreamId; | ||
import io.airbyte.integrations.base.destination.typing_deduping.V2TableMigrator; | ||
import java.sql.SQLException; | ||
import java.util.LinkedHashMap; | ||
import java.util.Optional; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public class SnowflakeV2TableMigrator implements V2TableMigrator<SnowflakeTableDefinition> { | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeV2TableMigrator.class); | ||
|
||
private final JdbcDatabase database; | ||
private final String rawNamespace; | ||
private final String databaseName; | ||
private final SnowflakeSqlGenerator generator; | ||
private final SnowflakeDestinationHandler handler; | ||
|
||
public SnowflakeV2TableMigrator(final JdbcDatabase database, | ||
final String databaseName, | ||
final SnowflakeSqlGenerator generator, | ||
final SnowflakeDestinationHandler handler) { | ||
this.database = database; | ||
this.databaseName = databaseName; | ||
this.generator = generator; | ||
this.handler = handler; | ||
this.rawNamespace = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).orElse(DEFAULT_AIRBYTE_INTERNAL_NAMESPACE); | ||
} | ||
|
||
@Override | ||
public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception { | ||
final StreamId lowercasedStreamId = buildStreamId_lowercase( | ||
streamConfig.id().originalNamespace(), | ||
streamConfig.id().originalName(), | ||
rawNamespace); | ||
final boolean existingTableLowercaseExists = findExistingTable_lowercase(lowercasedStreamId).isPresent(); | ||
final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent(); | ||
LOGGER.info( | ||
"Checking whether upcasing migration is necessary for {}.{}. Existing lowercased table exists: {}; existing uppercased table does not exist: {}", | ||
streamConfig.id().originalNamespace(), | ||
streamConfig.id().originalName(), | ||
existingTableLowercaseExists, | ||
existingTableUppercaseDoesNotExist); | ||
if (existingTableLowercaseExists && existingTableUppercaseDoesNotExist) { | ||
LOGGER.info( | ||
"Executing upcasing migration for {}.{}", | ||
streamConfig.id().originalNamespace(), | ||
streamConfig.id().originalName()); | ||
handler.execute(generator.softReset(streamConfig)); | ||
} | ||
} | ||
|
||
// These methods were copied from | ||
// https://github.com/airbytehq/airbyte/blob/d5fdb1b982d464f54941bf9a830b9684fb47d249/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java | ||
// which is the highest version of destination-snowflake that still uses quoted+lowercased | ||
// identifiers | ||
private static StreamId buildStreamId_lowercase(final String namespace, final String name, final String rawNamespaceOverride) { | ||
// No escaping needed, as far as I can tell. We quote all our identifier names. | ||
return new StreamId( | ||
escapeIdentifier_lowercase(namespace), | ||
escapeIdentifier_lowercase(name), | ||
escapeIdentifier_lowercase(rawNamespaceOverride), | ||
escapeIdentifier_lowercase(StreamId.concatenateRawTableName(namespace, name)), | ||
namespace, | ||
name); | ||
} | ||
|
||
private static String escapeIdentifier_lowercase(final String identifier) { | ||
// Note that we don't need to escape backslashes here! | ||
// The only special character in an identifier is the double-quote, which needs to be doubled. | ||
return identifier.replace("\"", "\"\""); | ||
} | ||
|
||
// And this was taken from | ||
// https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java | ||
public Optional<SnowflakeTableDefinition> findExistingTable_lowercase(final StreamId id) throws SQLException { | ||
// The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates | ||
// VARIANT as VARCHAR | ||
final LinkedHashMap<String, String> columns = database.queryJsons( | ||
""" | ||
SELECT column_name, data_type | ||
FROM information_schema.columns | ||
WHERE table_catalog = ? | ||
AND table_schema = ? | ||
AND table_name = ? | ||
ORDER BY ordinal_position; | ||
""", | ||
databaseName.toUpperCase(), | ||
id.finalNamespace(), | ||
id.finalName()).stream() | ||
.collect(LinkedHashMap::new, | ||
(map, row) -> map.put(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText()), | ||
LinkedHashMap::putAll); | ||
// TODO query for indexes/partitioning/etc | ||
|
||
if (columns.isEmpty()) { | ||
return Optional.empty(); | ||
} else { | ||
return Optional.of(new SnowflakeTableDefinition(columns)); | ||
} | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.