Skip to content

Commit da79f6e

Browse files
edgaogisripa
andauthored
Destination Snowflake: Write extracted_at in UTC (#35308)
Signed-off-by: Gireesh Sreepathi <[email protected]> Co-authored-by: Gireesh Sreepathi <[email protected]>
1 parent 9ce9217 commit da79f6e

File tree

29 files changed

+1165
-115
lines changed

29 files changed

+1165
-115
lines changed

airbyte-integrations/connectors/destination-snowflake/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
plugins {
22
id 'airbyte-java-connector'
3+
id 'org.jetbrains.kotlin.jvm' version '1.9.22'
34
}
45

56
airbyteJavaConnector {
6-
cdkVersionRequired = '0.23.2'
7+
cdkVersionRequired = '0.23.11'
78
features = ['db-destinations', 's3-destinations', 'typing-deduping']
89
useLocalCdk = false
910
}

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.5.14
8+
dockerImageTag: 3.6.0
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
1010
import io.airbyte.cdk.db.jdbc.JdbcUtils;
1111
import io.airbyte.cdk.integrations.base.Destination;
12+
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
1213
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
1314
import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag;
1415
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
@@ -23,15 +24,18 @@
2324
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
2425
import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeOperationValve;
2526
import io.airbyte.integrations.base.destination.typing_deduping.TyperDeduper;
27+
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
2628
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeDestinationHandler;
2729
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeSqlGenerator;
2830
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV1V2Migrator;
2931
import io.airbyte.integrations.destination.snowflake.typing_deduping.SnowflakeV2TableMigrator;
32+
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
3033
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
3134
import io.airbyte.protocol.models.v0.AirbyteMessage;
3235
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
3336
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream;
3437
import java.util.Collections;
38+
import java.util.List;
3539
import java.util.Map;
3640
import java.util.Optional;
3741
import java.util.UUID;
@@ -131,7 +135,7 @@ protected JdbcSqlGenerator getSqlGenerator() {
131135
}
132136

133137
@Override
134-
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
138+
protected JdbcDestinationHandler<SnowflakeState> getDestinationHandler(String databaseName, JdbcDatabase database, String rawTableSchema) {
135139
throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface");
136140
}
137141

@@ -151,22 +155,33 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
151155
final TyperDeduper typerDeduper;
152156
final JdbcDatabase database = getDatabase(getDataSource(config));
153157
final String databaseName = config.get(JdbcUtils.DATABASE_KEY).asText();
154-
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database);
158+
final String rawTableSchemaName;
155159
final CatalogParser catalogParser;
156160
if (TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).isPresent()) {
157-
catalogParser = new CatalogParser(sqlGenerator, TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get());
161+
rawTableSchemaName = TypingAndDedupingFlag.getRawNamespaceOverride(RAW_SCHEMA_OVERRIDE).get();
162+
catalogParser = new CatalogParser(sqlGenerator, rawTableSchemaName);
158163
} else {
164+
rawTableSchemaName = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
159165
catalogParser = new CatalogParser(sqlGenerator);
160166
}
167+
final SnowflakeDestinationHandler snowflakeDestinationHandler = new SnowflakeDestinationHandler(databaseName, database, rawTableSchemaName);
161168
parsedCatalog = catalogParser.parseCatalog(catalog);
162169
final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName);
163170
final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler);
164171
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
172+
final List<Migration<SnowflakeState>> migrations = List.of();
165173
if (disableTypeDedupe) {
166-
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
174+
typerDeduper =
175+
new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, migrations);
167176
} else {
168177
typerDeduper =
169-
new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator);
178+
new DefaultTyperDeduper<>(
179+
sqlGenerator,
180+
snowflakeDestinationHandler,
181+
parsedCatalog,
182+
migrator,
183+
v2TableMigrator,
184+
migrations);
170185
}
171186

172187
return StagingConsumerFactory.builder(

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java

Lines changed: 77 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition;
1515
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
1616
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
17+
import io.airbyte.commons.json.Jsons;
1718
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
1819
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
1920
import io.airbyte.integrations.base.destination.typing_deduping.Array;
2021
import io.airbyte.integrations.base.destination.typing_deduping.ColumnId;
21-
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
22-
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl;
23-
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState;
22+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStatus;
23+
import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableStatus;
2424
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
2525
import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig;
2626
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
2727
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
2828
import io.airbyte.integrations.base.destination.typing_deduping.Union;
2929
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;
30+
import io.airbyte.integrations.destination.snowflake.typing_deduping.migrations.SnowflakeState;
31+
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair;
32+
import io.airbyte.protocol.models.v0.DestinationSyncMode;
3033
import java.sql.ResultSet;
3134
import java.sql.SQLException;
3235
import java.time.Instant;
@@ -40,20 +43,23 @@
4043
import java.util.stream.Collectors;
4144
import net.snowflake.client.jdbc.SnowflakeSQLException;
4245
import org.apache.commons.text.StringSubstitutor;
46+
import org.jooq.SQLDialect;
4347
import org.slf4j.Logger;
4448
import org.slf4j.LoggerFactory;
4549

46-
public class SnowflakeDestinationHandler extends JdbcDestinationHandler {
50+
public class SnowflakeDestinationHandler extends JdbcDestinationHandler<SnowflakeState> {
4751

4852
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class);
4953
public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement";
5054

5155
private final String databaseName;
5256
private final JdbcDatabase database;
5357

54-
public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database) {
55-
super(databaseName, database);
56-
this.databaseName = databaseName;
58+
public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase database, final String rawTableSchema) {
59+
// Postgres is close enough to Snowflake SQL for our purposes.
60+
super(databaseName, database, rawTableSchema, SQLDialect.POSTGRES);
61+
// We don't quote the database name in any queries, so just upcase it.
62+
this.databaseName = databaseName.toUpperCase();
5763
this.database = database;
5864
}
5965

@@ -107,7 +113,7 @@ AND table_schema IN (%s)
107113
AND table_name IN (%s)
108114
""".formatted(paramHolder, paramHolder);
109115
final String[] bindValues = new String[streamIds.size() * 2 + 1];
110-
bindValues[0] = databaseName.toUpperCase();
116+
bindValues[0] = databaseName;
111117
System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length);
112118
System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length);
113119
final List<JsonNode> results = database.queryJsons(query, bindValues);
@@ -120,14 +126,18 @@ AND table_name IN (%s)
120126
return tableRowCounts;
121127
}
122128

123-
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
129+
private InitialRawTableStatus getInitialRawTableState(final StreamId id, final DestinationSyncMode destinationSyncMode) throws Exception {
130+
// Short-circuit for overwrite, table will be truncated anyway
131+
if (destinationSyncMode == DestinationSyncMode.OVERWRITE) {
132+
return new InitialRawTableStatus(false, false, Optional.empty());
133+
}
124134
final ResultSet tables = database.getMetaData().getTables(
125135
databaseName,
126136
id.rawNamespace(),
127137
id.rawName(),
128138
null);
129139
if (!tables.next()) {
130-
return new InitialRawTableState(false, Optional.empty());
140+
return new InitialRawTableStatus(false, false, Optional.empty());
131141
}
132142
// Snowflake timestamps have nanosecond precision, so decrement by 1ns
133143
// And use two explicit queries because COALESCE doesn't short-circuit.
@@ -136,33 +146,55 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex
136146
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
137147
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
138148
"""
139-
SELECT to_varchar(
140-
TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")),
141-
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
142-
) AS MIN_TIMESTAMP
143-
FROM ${raw_table}
144-
WHERE "_airbyte_loaded_at" IS NULL
149+
WITH MIN_TS AS (
150+
SELECT TIMESTAMPADD(NANOSECOND, -1,
151+
MIN(TIMESTAMPADD(
152+
HOUR,
153+
EXTRACT(timezone_hour from "_airbyte_extracted_at"),
154+
TIMESTAMPADD(
155+
MINUTE,
156+
EXTRACT(timezone_minute from "_airbyte_extracted_at"),
157+
CONVERT_TIMEZONE('UTC', "_airbyte_extracted_at")
158+
)
159+
))) AS MIN_TIMESTAMP
160+
FROM ${raw_table}
161+
WHERE "_airbyte_loaded_at" IS NULL
162+
) SELECT TO_VARCHAR(MIN_TIMESTAMP,'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MIN_TIMESTAMP_UTC from MIN_TS;
145163
""")),
146164
// The query will always return exactly one record, so use .get(0)
147-
record -> record.getString("MIN_TIMESTAMP")).get(0));
165+
record -> record.getString("MIN_TIMESTAMP_UTC")).get(0));
148166
if (minUnloadedTimestamp.isPresent()) {
149-
return new InitialRawTableState(true, minUnloadedTimestamp.map(Instant::parse));
167+
return new InitialRawTableStatus(true, true, minUnloadedTimestamp.map(Instant::parse));
150168
}
151169

152170
// If there are no unloaded raw records, then we can safely skip all existing raw records.
153171
// This second query just finds the newest raw record.
172+
173+
// This is _technically_ wrong, because during the DST transition we might select
174+
// the wrong max timestamp. We _should_ do the UTC conversion inside the CTE, but that's a lot
175+
// of work for a very small edge case.
176+
// We released the fix to write extracted_at in UTC before DST changed, so this is fine.
154177
final Optional<String> maxTimestamp = Optional.ofNullable(database.queryStrings(
155178
conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of(
156179
"raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace(
157180
"""
158-
SELECT to_varchar(
159-
MAX("_airbyte_extracted_at"),
160-
'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM'
161-
) AS MIN_TIMESTAMP
162-
FROM ${raw_table}
181+
WITH MAX_TS AS (
182+
SELECT MAX("_airbyte_extracted_at")
183+
AS MAX_TIMESTAMP
184+
FROM ${raw_table}
185+
) SELECT TO_VARCHAR(
186+
TIMESTAMPADD(
187+
HOUR,
188+
EXTRACT(timezone_hour from MAX_TIMESTAMP),
189+
TIMESTAMPADD(
190+
MINUTE,
191+
EXTRACT(timezone_minute from MAX_TIMESTAMP),
192+
CONVERT_TIMEZONE('UTC', MAX_TIMESTAMP)
193+
)
194+
),'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM') as MAX_TIMESTAMP_UTC from MAX_TS;
163195
""")),
164-
record -> record.getString("MIN_TIMESTAMP")).get(0));
165-
return new InitialRawTableState(false, maxTimestamp.map(Instant::parse));
196+
record -> record.getString("MAX_TIMESTAMP_UTC")).get(0));
197+
return new InitialRawTableStatus(true, false, maxTimestamp.map(Instant::parse));
166198
}
167199

168200
@Override
@@ -171,7 +203,7 @@ public void execute(final Sql sql) throws Exception {
171203
final UUID queryId = UUID.randomUUID();
172204
for (final String transaction : transactions) {
173205
final UUID transactionId = UUID.randomUUID();
174-
LOGGER.debug("Executing sql {}-{}: {}", queryId, transactionId, transaction);
206+
LOGGER.info("Executing sql {}-{}: {}", queryId, transactionId, transaction);
175207
final long startTime = System.currentTimeMillis();
176208

177209
try {
@@ -190,7 +222,7 @@ public void execute(final Sql sql) throws Exception {
190222
throw new RuntimeException(trimmedMessage, e);
191223
}
192224

193-
LOGGER.debug("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
225+
LOGGER.info("Sql {}-{} completed in {} ms", queryId, transactionId, System.currentTimeMillis() - startTime);
194226
}
195227
}
196228

@@ -250,7 +282,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
250282
}
251283

252284
@Override
253-
public List<DestinationInitialState> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
285+
public List<DestinationInitialStatus<SnowflakeState>> gatherInitialState(List<StreamConfig> streamConfigs) throws Exception {
286+
final Map<AirbyteStreamNameNamespacePair, SnowflakeState> destinationStates = super.getAllDestinationStates();
287+
254288
List<StreamId> streamIds = streamConfigs.stream().map(StreamConfig::id).toList();
255289
final LinkedHashMap<String, LinkedHashMap<String, TableDefinition>> existingTables = findExistingTables(database, databaseName, streamIds);
256290
final LinkedHashMap<String, LinkedHashMap<String, Integer>> tableRowCounts = getFinalTableRowCount(streamIds);
@@ -267,8 +301,15 @@ public List<DestinationInitialState> gatherInitialState(List<StreamConfig> strea
267301
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable);
268302
isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0;
269303
}
270-
final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id());
271-
return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty);
304+
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id(), streamConfig.destinationSyncMode());
305+
final SnowflakeState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
306+
return new DestinationInitialStatus<>(
307+
streamConfig,
308+
isFinalTablePresent,
309+
initialRawTableState,
310+
isSchemaMismatch,
311+
isFinalTableEmpty,
312+
destinationState);
272313
} catch (Exception e) {
273314
throw new RuntimeException(e);
274315
}
@@ -290,6 +331,12 @@ protected String toJdbcTypeName(AirbyteType airbyteType) {
290331
};
291332
}
292333

334+
@Override
335+
protected SnowflakeState toDestinationState(JsonNode json) {
336+
return new SnowflakeState(
337+
json.hasNonNull("needsSoftReset") && json.get("needsSoftReset").asBoolean());
338+
}
339+
293340
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
294341
return switch (airbyteProtocolType) {
295342
case STRING -> "TEXT";

0 commit comments

Comments
 (0)