Skip to content

Commit 16ed6bf

Browse files
revert contention-reducing change in destination-snowflake (#38052)
as part of the move of destination-snowflake to the kotlin CDK, we tried improve concurrency by only `DELETE`ing from `_airbyte_destination_state` if it has some data to delete (by issuing an `IF EXISTS` in the same transaction. Looks like it might be causing some stuck syncs, so we're reverting that "improvement"
1 parent b37fd9a commit 16ed6bf

File tree

5 files changed

+5
-29
lines changed

5 files changed

+5
-29
lines changed

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/JdbcDatabase.kt

+2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ abstract class JdbcDatabase(protected val sourceOperations: JdbcCompatibleSource
4343
execute { connection: Connection ->
4444
connection.autoCommit = false
4545
for (s in queries) {
46+
LOGGER.info("executing query within transaction: $s")
4647
connection.createStatement().execute(s)
48+
LOGGER.info("done executing query within transaction: $s")
4749
}
4850
connection.commit()
4951
connection.autoCommit = true
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
testExecutionConcurrency=-1
1+
testExecutionConcurrency=4
22
JunitMethodExecutionTimeout=30 m

airbyte-integrations/connectors/destination-snowflake/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ data:
55
connectorSubtype: database
66
connectorType: destination
77
definitionId: 424892c4-daac-4491-b35d-c6688ba547ba
8-
dockerImageTag: 3.7.3
8+
dockerImageTag: 3.7.4
99
dockerRepository: airbyte/destination-snowflake
1010
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
1111
githubIssueLabel: destination-snowflake

airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java

-27
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.UUID;
4343
import java.util.stream.Collectors;
4444
import net.snowflake.client.jdbc.SnowflakeSQLException;
45-
import org.apache.commons.lang3.StringUtils;
4645
import org.apache.commons.text.StringSubstitutor;
4746
import org.jooq.SQLDialect;
4847
import org.slf4j.Logger;
@@ -362,30 +361,4 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
362361
};
363362
}
364363

365-
protected String getDeleteStatesSql(Map<StreamId, ? extends SnowflakeState> destinationStates) {
366-
// only doing the DELETE where there's rows to delete allows us to avoid taking a lock on the table
367-
// when there's nothing to delete
368-
// This is particularly relevant in the context of tests, where many instance of the snowflake
369-
// destination could be run in parallel
370-
String deleteStatesSql = super.getDeleteStatesSql(destinationStates);
371-
StringBuilder sql = new StringBuilder();
372-
// sql.append("BEGIN\n");
373-
sql.append(" IF (EXISTS (").append(deleteStatesSql.replace("delete from", "SELECT 1 FROM ")).append(")) THEN\n");
374-
sql.append(" ").append(deleteStatesSql).append(";\n");
375-
sql.append(" END IF\n");
376-
// sql.append("END;\n");
377-
return sql.toString();
378-
}
379-
380-
protected void executeWithinTransaction(List<String> statements) throws SQLException {
381-
StringBuilder sb = new StringBuilder();
382-
sb.append("BEGIN\n");
383-
sb.append(" BEGIN TRANSACTION;\n ");
384-
sb.append(StringUtils.join(statements, ";\n "));
385-
sb.append(";\n COMMIT;\n");
386-
sb.append("END;");
387-
LOGGER.info("executing SQL:" + sb);
388-
getJdbcDatabase().execute(sb.toString());
389-
}
390-
391364
}

docs/integrations/destinations/snowflake.md

+1
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ desired namespace.
276276

277277
| Version | Date | Pull Request | Subject |
278278
| :-------------- | :--------- | :--------------------------------------------------------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------- |
279+
| 3.7.4 | 2024-05-07 | [\#38052](https://github.com/airbytehq/airbyte/pull/38052) | Revert problematic optimization |
279280
| 3.7.3 | 2024-05-07 | [\#34612](https://github.com/airbytehq/airbyte/pull/34612) | Adopt CDK 0.33.2 |
280281
| 3.7.2 | 2024-05-06 | [\#37857](https://github.com/airbytehq/airbyte/pull/37857) | Use safe executeMetadata call |
281282
| 3.7.1 | 2024-04-30 | [\#36910](https://github.com/airbytehq/airbyte/pull/36910) | Bump CDK version |

0 commit comments

Comments
 (0)