Skip to content

Destinatoin snowflake: Add config option to enable time travel #35754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
}
}

final SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
final int retentionPeriodDays = SnowflakeSqlOperations.getRetentionPeriodDays(
config.get(SnowflakeSqlOperations.RETENTION_PERIOD_DAYS_CONFIG_KEY));

final SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(retentionPeriodDays);
final ParsedCatalog parsedCatalog;
final TyperDeduper typerDeduper;
final JdbcDatabase database = getDatabase(getDataSource(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.DestinationConfig;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination.jdbc.SqlOperations;
Expand All @@ -21,7 +22,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations {
public class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations {

public static final String RETENTION_PERIOD_DAYS_CONFIG_KEY = "retention_period_days";

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
private static final int MAX_FILES_IN_LOADING_QUERY_LIMIT = 1000;
Expand All @@ -46,20 +49,42 @@ public void createSchemaIfNotExists(final JdbcDatabase database, final String sc

@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
int retentionPeriodDays = getRetentionPeriodDaysFromConfigSingleton();
return String.format(
"""
CREATE TABLE IF NOT EXISTS "%s"."%s" (
"%s" VARCHAR PRIMARY KEY,
"%s" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
"%s" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
"%s" VARIANT
) data_retention_time_in_days = 0;""",
) data_retention_time_in_days = %d;""",
schemaName,
tableName,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
JavaBaseConstants.COLUMN_NAME_DATA,
retentionPeriodDays);
}

/**
* Sort of hacky. The problem is that SnowflakeSqlOperations is constructed in the
* SnowflakeDestination constructor, but we don't have the JsonNode config until we try to call
* check/getSerializedConsumer on the SnowflakeDestination. So we can't actually inject the config
* normally. Instead, we just use the singleton object. :(
*/
private static int getRetentionPeriodDaysFromConfigSingleton() {
return getRetentionPeriodDays(DestinationConfig.getInstance().getNodeValue(RETENTION_PERIOD_DAYS_CONFIG_KEY));
}

public static int getRetentionPeriodDays(final JsonNode node) {
int retentionPeriodDays;
if (node == null || node.isNull()) {
retentionPeriodDays = 1;
} else {
retentionPeriodDays = node.asInt();
}
return retentionPeriodDays;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ public class SnowflakeSqlGenerator implements SqlGenerator {
"LOCALTIME",
"LOCALTIMESTAMP");

private final int retentionPeriodDays;

public SnowflakeSqlGenerator(int retentionPeriodDays) {
this.retentionPeriodDays = retentionPeriodDays;
}

@Override
public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) {
return new StreamId(
Expand Down Expand Up @@ -119,14 +125,15 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo
return Sql.of(new StringSubstitutor(Map.of(
"final_table_id", stream.id().finalTableId(QUOTE, suffix.toUpperCase()),
"force_create_table", forceCreateTable,
"column_declarations", columnDeclarations)).replace(
"column_declarations", columnDeclarations,
"retention_period_days", retentionPeriodDays)).replace(
"""
CREATE ${force_create_table} TABLE ${final_table_id} (
"_AIRBYTE_RAW_ID" TEXT NOT NULL,
"_AIRBYTE_EXTRACTED_AT" TIMESTAMP_TZ NOT NULL,
"_AIRBYTE_META" VARIANT NOT NULL
${column_declarations}
);
) data_retention_time_in_days = ${retention_period_days};
"""));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@
"description": "When enabled your data will load into your final tables incrementally while your data is still being synced. When Disabled (the default), your data loads into your final tables once at the end of a sync. Note that this option only applies if you elect to create Final tables",
"title": "Enable Loading Data Incrementally to Final Tables",
"order": 12
},
"retention_period_days": {
"type": "integer",
"default": 1,
"description": "The number of days of Snowflake Time Travel to enable on the tables. See <a href=\"https://docs.snowflake.com/en/user-guide/data-time-travel#data-retention-period\">Snowflake's documentation</a> for more information. Setting a nonzero value will incur increased storage costs in your Snowflake instance.",
"title": "Data Retention Period (days)",
"order": 13
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
final String namespace,
final JsonNode streamSchema)
throws Exception {
final StreamId streamId = new SnowflakeSqlGenerator().buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
final StreamId streamId = new SnowflakeSqlGenerator(0).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
return retrieveRecordsFromTable(streamId.rawName(), streamId.rawNamespace())
.stream()
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ protected void globalTeardown() throws Exception {

@Override
protected SqlGenerator getSqlGenerator() {
return new SnowflakeSqlGenerator();
return new SnowflakeSqlGenerator(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static void teardownSnowflake() throws Exception {

@Override
protected SnowflakeSqlGenerator getSqlGenerator() {
return new SnowflakeSqlGenerator();
return new SnowflakeSqlGenerator(0);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

public class SnowflakeSqlGeneratorTest {

private final SnowflakeSqlGenerator generator = new SnowflakeSqlGenerator();
private final SnowflakeSqlGenerator generator = new SnowflakeSqlGenerator(0);

@Test
void columnNameSpecialCharacterHandling() {
Expand Down