Skip to content

Commit 4fc7f1a

Browse files
authored
Destination snowflake: Add config option to enable time travel (#35754)
1 parent 1815e38 commit 4fc7f1a

File tree

11 files changed

+55
-12
lines changed

11 files changed

+55
-12
lines changed

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.6.6
8+
dockerImageTag: 3.7.0
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,10 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
160160
}
161161
}
162162

163-
final SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator();
163+
final int retentionPeriodDays = SnowflakeSqlOperations.getRetentionPeriodDays(
164+
config.get(SnowflakeSqlOperations.RETENTION_PERIOD_DAYS_CONFIG_KEY));
165+
166+
final SnowflakeSqlGenerator sqlGenerator = new SnowflakeSqlGenerator(retentionPeriodDays);
164167
final ParsedCatalog parsedCatalog;
165168
final TyperDeduper typerDeduper;
166169
final JdbcDatabase database = getDatabase(getDataSource(config));

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.JsonNode;
88
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
9+
import io.airbyte.cdk.integrations.base.DestinationConfig;
910
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
1011
import io.airbyte.cdk.integrations.destination.async.partial_messages.PartialAirbyteMessage;
1112
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
@@ -21,7 +22,9 @@
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

24-
class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations {
25+
public class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations {
26+
27+
public static final String RETENTION_PERIOD_DAYS_CONFIG_KEY = "retention_period_days";
2528

2629
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
2730
private static final int MAX_FILES_IN_LOADING_QUERY_LIMIT = 1000;
@@ -46,20 +49,42 @@ public void createSchemaIfNotExists(final JdbcDatabase database, final String sc
4649

4750
@Override
4851
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
52+
int retentionPeriodDays = getRetentionPeriodDaysFromConfigSingleton();
4953
return String.format(
5054
"""
5155
CREATE TABLE IF NOT EXISTS "%s"."%s" (
5256
"%s" VARCHAR PRIMARY KEY,
5357
"%s" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
5458
"%s" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
5559
"%s" VARIANT
56-
) data_retention_time_in_days = 0;""",
60+
) data_retention_time_in_days = %d;""",
5761
schemaName,
5862
tableName,
5963
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
6064
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
6165
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
62-
JavaBaseConstants.COLUMN_NAME_DATA);
66+
JavaBaseConstants.COLUMN_NAME_DATA,
67+
retentionPeriodDays);
68+
}
69+
70+
/**
71+
* Sort of hacky. The problem is that SnowflakeSqlOperations is constructed in the
72+
* SnowflakeDestination constructor, but we don't have the JsonNode config until we try to call
73+
* check/getSerializedConsumer on the SnowflakeDestination. So we can't actually inject the config
74+
* normally. Instead, we just use the singleton object. :(
75+
*/
76+
private static int getRetentionPeriodDaysFromConfigSingleton() {
77+
return getRetentionPeriodDays(DestinationConfig.getInstance().getNodeValue(RETENTION_PERIOD_DAYS_CONFIG_KEY));
78+
}
79+
80+
public static int getRetentionPeriodDays(final JsonNode node) {
81+
int retentionPeriodDays;
82+
if (node == null || node.isNull()) {
83+
retentionPeriodDays = 1;
84+
} else {
85+
retentionPeriodDays = node.asInt();
86+
}
87+
return retentionPeriodDays;
6388
}
6489

6590
@Override

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ public class SnowflakeSqlGenerator implements SqlGenerator {
4949
"LOCALTIME",
5050
"LOCALTIMESTAMP");
5151

52+
private final int retentionPeriodDays;
53+
54+
public SnowflakeSqlGenerator(int retentionPeriodDays) {
55+
this.retentionPeriodDays = retentionPeriodDays;
56+
}
57+
5258
@Override
5359
public StreamId buildStreamId(final String namespace, final String name, final String rawNamespaceOverride) {
5460
return new StreamId(
@@ -119,14 +125,15 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo
119125
return Sql.of(new StringSubstitutor(Map.of(
120126
"final_table_id", stream.id().finalTableId(QUOTE, suffix.toUpperCase()),
121127
"force_create_table", forceCreateTable,
122-
"column_declarations", columnDeclarations)).replace(
128+
"column_declarations", columnDeclarations,
129+
"retention_period_days", retentionPeriodDays)).replace(
123130
"""
124131
CREATE ${force_create_table} TABLE ${final_table_id} (
125132
"_AIRBYTE_RAW_ID" TEXT NOT NULL,
126133
"_AIRBYTE_EXTRACTED_AT" TIMESTAMP_TZ NOT NULL,
127134
"_AIRBYTE_META" VARIANT NOT NULL
128135
${column_declarations}
129-
);
136+
) data_retention_time_in_days = ${retention_period_days};
130137
"""));
131138
}
132139

airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json

+7
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,13 @@
181181
"description": "When enabled your data will load into your final tables incrementally while your data is still being synced. When Disabled (the default), your data loads into your final tables once at the end of a sync. Note that this option only applies if you elect to create Final tables",
182182
"title": "Enable Loading Data Incrementally to Final Tables",
183183
"order": 12
184+
},
185+
"retention_period_days": {
186+
"type": "integer",
187+
"default": 1,
188+
"description": "The number of days of Snowflake Time Travel to enable on the tables. See <a href=\"https://docs.snowflake.com/en/user-guide/data-time-travel#data-retention-period\">Snowflake's documentation</a> for more information. Setting a nonzero value will incur increased storage costs in your Snowflake instance.",
189+
"title": "Data Retention Period (days)",
190+
"order": 13
184191
}
185192
}
186193
},

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestinationAcceptanceTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv env,
122122
final String namespace,
123123
final JsonNode streamSchema)
124124
throws Exception {
125-
final StreamId streamId = new SnowflakeSqlGenerator().buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
125+
final StreamId streamId = new SnowflakeSqlGenerator(0).buildStreamId(namespace, streamName, JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE);
126126
return retrieveRecordsFromTable(streamId.rawName(), streamId.rawNamespace())
127127
.stream()
128128
.map(r -> r.get(JavaBaseConstants.COLUMN_NAME_DATA))

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected void globalTeardown() throws Exception {
111111

112112
@Override
113113
protected SqlGenerator getSqlGenerator() {
114-
return new SnowflakeSqlGenerator();
114+
return new SnowflakeSqlGenerator(0);
115115
}
116116

117117
@Override

airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public static void teardownSnowflake() throws Exception {
6969

7070
@Override
7171
protected SnowflakeSqlGenerator getSqlGenerator() {
72-
return new SnowflakeSqlGenerator();
72+
return new SnowflakeSqlGenerator(0);
7373
}
7474

7575
@Override

airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ void createTableQuery() {
4444
"%s" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp(),
4545
"%s" TIMESTAMP WITH TIME ZONE DEFAULT NULL,
4646
"%s" VARIANT
47-
) data_retention_time_in_days = 0;""",
47+
) data_retention_time_in_days = 1;""",
4848
SCHEMA_NAME,
4949
TABLE_NAME,
5050
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,

airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
public class SnowflakeSqlGeneratorTest {
2626

27-
private final SnowflakeSqlGenerator generator = new SnowflakeSqlGenerator();
27+
private final SnowflakeSqlGenerator generator = new SnowflakeSqlGenerator(0);
2828

2929
@Test
3030
void columnNameSpecialCharacterHandling() {

docs/integrations/destinations/snowflake.md

+1
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ desired namespace.
276276

277277
| Version | Date | Pull Request | Subject |
278278
|:----------------|:-----------|:-------------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
279+
| 3.7.0 | 2024-04-08 | [\#35754](https://github.com/airbytehq/airbyte/pull/35754) | Allow configuring `data_retention_time_in_days`; apply to both raw and final tables. *Note*: Existing tables will not be affected; you must manually alter them.|
279280
| 3.6.6 | 2024-03-26 | [\#36466](https://github.com/airbytehq/airbyte/pull/36466) | Correctly hhandle instances with `QUOTED_IDENTIFIERS_IGNORE_CASE` enabled globally |
280281
| 3.6.5 | 2024-03-25 | [\#36461](https://github.com/airbytehq/airbyte/pull/36461) | Internal code change (use published CDK artifact instead of source dependency) |
281282
| 3.6.4 | 2024-03-25 | [\#36396](https://github.com/airbytehq/airbyte/pull/36396) | Handle instances with `QUOTED_IDENTIFIERS_IGNORE_CASE` enabled globally |

0 commit comments

Comments
 (0)