Skip to content

Commit c27e2a0

Browse files
authored
🐞 Snowflake destination: use pooled connections (#10342)
* Use data source conn supplier for snowflake database * Format code * Reuse the same database in integration tests * Close query stream * Refactor snowflake staging sql operations * Close result set * Add annotations * Bump version * Bump version in seed
1 parent 7c93ef1 commit c27e2a0

File tree

17 files changed

+182
-210
lines changed

17 files changed

+182
-210
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@
185185
- name: Snowflake
186186
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
187187
dockerRepository: airbyte/destination-snowflake
188-
dockerImageTag: 0.4.11
188+
dockerImageTag: 0.4.12
189189
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
190190
icon: snowflake.svg
191191
- name: MariaDB ColumnStore

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -3817,7 +3817,7 @@
38173817
supported_destination_sync_modes:
38183818
- "overwrite"
38193819
- "append"
3820-
- dockerImage: "airbyte/destination-snowflake:0.4.11"
3820+
- dockerImage: "airbyte/destination-snowflake:0.4.12"
38213821
spec:
38223822
documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake"
38233823
connectionSpecification:

airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java

+21
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import io.airbyte.db.jdbc.StreamingJdbcDatabase;
1515
import io.airbyte.db.mongodb.MongoDatabase;
1616
import java.io.IOException;
17+
import java.util.Map;
1718
import java.util.Optional;
1819
import java.util.function.Function;
1920
import lombok.val;
@@ -198,6 +199,10 @@ private static BasicDataSource createBasicDataSource(final String username,
198199
Optional.empty());
199200
}
200201

202+
/**
203+
* Prefer to use the method that takes in the connection properties as a map.
204+
*/
205+
@Deprecated
201206
private static BasicDataSource createBasicDataSource(final String username,
202207
final String password,
203208
final String jdbcConnectionString,
@@ -214,6 +219,22 @@ private static BasicDataSource createBasicDataSource(final String username,
214219
return connectionPool;
215220
}
216221

222+
public static BasicDataSource createBasicDataSource(final String username,
223+
final String password,
224+
final String jdbcConnectionString,
225+
final String driverClassName,
226+
final Map<String, String> connectionProperties) {
227+
final BasicDataSource connectionPool = new BasicDataSource();
228+
connectionPool.setDriverClassName(driverClassName);
229+
connectionPool.setUsername(username);
230+
connectionPool.setPassword(password);
231+
connectionPool.setInitialSize(0);
232+
connectionPool.setMaxTotal(5);
233+
connectionPool.setUrl(jdbcConnectionString);
234+
connectionProperties.forEach(connectionPool::addConnectionProperty);
235+
return connectionPool;
236+
}
237+
217238
public static BigQueryDatabase createBigQueryDatabase(final String projectId, final String jsonCreds) {
218239
return new BigQueryDatabase(projectId, jsonCreds);
219240
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.jdbc;
6+
7+
import java.sql.Connection;
8+
import java.sql.SQLException;
9+
10+
public interface CloseableConnectionSupplier extends AutoCloseable {
11+
12+
Connection getConnection() throws SQLException;
13+
14+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.db.jdbc;
6+
7+
import java.io.Closeable;
8+
import java.sql.Connection;
9+
import java.sql.SQLException;
10+
import javax.sql.DataSource;
11+
12+
public class DataSourceConnectionSupplier implements CloseableConnectionSupplier {
13+
14+
private final DataSource dataSource;
15+
16+
public DataSourceConnectionSupplier(final DataSource dataSource) {
17+
this.dataSource = dataSource;
18+
}
19+
20+
@Override
21+
public Connection getConnection() throws SQLException {
22+
return dataSource.getConnection();
23+
}
24+
25+
@Override
26+
public void close() throws Exception {
27+
// Just a safety in case we are using a datasource implementation that requires closing.
28+
// BasicDataSource from apache does since it also provides a pooling mechanism to reuse connections.
29+
30+
if (dataSource instanceof AutoCloseable) {
31+
((AutoCloseable) dataSource).close();
32+
}
33+
if (dataSource instanceof Closeable) {
34+
((Closeable) dataSource).close();
35+
}
36+
}
37+
38+
}

airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java

+6-42
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44

55
package io.airbyte.db.jdbc;
66

7+
import com.google.errorprone.annotations.MustBeClosed;
78
import io.airbyte.commons.functional.CheckedConsumer;
89
import io.airbyte.commons.functional.CheckedFunction;
910
import io.airbyte.db.JdbcCompatibleSourceOperations;
10-
import java.io.Closeable;
1111
import java.sql.Connection;
1212
import java.sql.DatabaseMetaData;
1313
import java.sql.PreparedStatement;
@@ -42,11 +42,6 @@ public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier,
4242
this.connectionSupplier = connectionSupplier;
4343
}
4444

45-
public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier) {
46-
super(JdbcUtils.getDefaultSourceOperations());
47-
this.connectionSupplier = connectionSupplier;
48-
}
49-
5045
@Override
5146
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
5247
try (final Connection connection = connectionSupplier.getConnection()) {
@@ -58,12 +53,14 @@ public void execute(final CheckedConsumer<Connection, SQLException> query) throw
5853
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
5954
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
6055
throws SQLException {
61-
try (final Connection connection = connectionSupplier.getConnection()) {
62-
return toStream(query.apply(connection), recordTransform).collect(Collectors.toList());
56+
try (final Connection connection = connectionSupplier.getConnection();
57+
final Stream<T> results = toStream(query.apply(connection), recordTransform)) {
58+
return results.collect(Collectors.toList());
6359
}
6460
}
6561

6662
@Override
63+
@MustBeClosed
6764
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
6865
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
6966
throws SQLException {
@@ -101,6 +98,7 @@ public DatabaseMetaData getMetaData() throws SQLException {
10198
* @throws SQLException SQL related exceptions.
10299
*/
103100
@Override
101+
@MustBeClosed
104102
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
105103
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
106104
throws SQLException {
@@ -121,38 +119,4 @@ public void close() throws Exception {
121119
connectionSupplier.close();
122120
}
123121

124-
public interface CloseableConnectionSupplier extends AutoCloseable {
125-
126-
Connection getConnection() throws SQLException;
127-
128-
}
129-
130-
public static final class DataSourceConnectionSupplier implements CloseableConnectionSupplier {
131-
132-
private final DataSource dataSource;
133-
134-
public DataSourceConnectionSupplier(final DataSource dataSource) {
135-
this.dataSource = dataSource;
136-
}
137-
138-
@Override
139-
public Connection getConnection() throws SQLException {
140-
return dataSource.getConnection();
141-
}
142-
143-
@Override
144-
public void close() throws Exception {
145-
// Just a safety in case we are using a datasource implementation that requires closing.
146-
// BasicDataSource from apache does since it also provides a pooling mechanism to reuse connections.
147-
148-
if (dataSource instanceof AutoCloseable) {
149-
((AutoCloseable) dataSource).close();
150-
}
151-
if (dataSource instanceof Closeable) {
152-
((Closeable) dataSource).close();
153-
}
154-
}
155-
156-
}
157-
158122
}

airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.db.jdbc;
66

77
import com.fasterxml.jackson.databind.JsonNode;
8+
import com.google.errorprone.annotations.MustBeClosed;
89
import io.airbyte.commons.functional.CheckedConsumer;
910
import io.airbyte.commons.functional.CheckedFunction;
1011
import io.airbyte.db.JdbcCompatibleSourceOperations;
@@ -65,13 +66,15 @@ public void executeWithinTransaction(final List<String> queries) throws SQLExcep
6566
* @param <T> type that each record will be mapped to
6667
* @return stream of records that the result set is mapped to.
6768
*/
68-
public static <T> Stream<T> toStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
69+
@MustBeClosed
70+
protected static <T> Stream<T> toStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
6971
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
7072

7173
@Override
7274
public boolean tryAdvance(final Consumer<? super T> action) {
7375
try {
7476
if (!resultSet.next()) {
77+
resultSet.close();
7578
return false;
7679
}
7780
action.accept(mapper.apply(resultSet));
@@ -116,6 +119,7 @@ public abstract <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, R
116119
* @return Result of the query mapped to a stream.
117120
* @throws SQLException SQL related exceptions.
118121
*/
122+
@MustBeClosed
119123
public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
120124
CheckedFunction<ResultSet, T, SQLException> recordTransform)
121125
throws SQLException;
@@ -135,6 +139,7 @@ public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultS
135139
* @return Result of the query mapped to a stream.void execute(String sql)
136140
* @throws SQLException SQL related exceptions.
137141
*/
142+
@MustBeClosed
138143
public abstract <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
139144
CheckedFunction<ResultSet, T, SQLException> recordTransform)
140145
throws SQLException;
@@ -154,6 +159,7 @@ public int queryInt(final String sql, final String... params) throws SQLExceptio
154159
}
155160
}
156161

162+
@MustBeClosed
157163
@Override
158164
public Stream<JsonNode> query(final String sql, final String... params) throws SQLException {
159165
return query(connection -> {

airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.db.jdbc;
66

7+
import com.google.errorprone.annotations.MustBeClosed;
78
import io.airbyte.commons.functional.CheckedConsumer;
89
import io.airbyte.commons.functional.CheckedFunction;
910
import io.airbyte.db.JdbcCompatibleSourceOperations;
@@ -61,6 +62,7 @@ public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, Resu
6162
}
6263

6364
@Override
65+
@MustBeClosed
6466
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
6567
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
6668
throws SQLException {
@@ -84,6 +86,7 @@ public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet,
8486
* @throws SQLException SQL related exceptions.
8587
*/
8688
@Override
89+
@MustBeClosed
8790
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
8891
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
8992
throws SQLException {

airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.stream.Collectors;
2020
import java.util.stream.Stream;
2121
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.AfterEach;
2223
import org.junit.jupiter.api.BeforeAll;
2324
import org.junit.jupiter.api.BeforeEach;
2425
import org.junit.jupiter.api.Test;
@@ -34,31 +35,33 @@ public class TestDefaultJdbcDatabase {
3435

3536
private static PostgreSQLContainer<?> PSQL_DB;
3637

37-
private JsonNode config;
38+
private JdbcDatabase database;
3839
private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations();
3940

4041
@BeforeAll
4142
static void init() {
4243
PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine");
4344
PSQL_DB.start();
44-
4545
}
4646

4747
@BeforeEach
4848
void setup() throws Exception {
4949
final String dbName = Strings.addRandomSuffix("db", "_", 10);
5050

51-
config = getConfig(PSQL_DB, dbName);
52-
51+
final JsonNode config = getConfig(PSQL_DB, dbName);
5352
final String initScriptName = "init_" + dbName.concat(".sql");
5453
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");
5554
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB);
5655

57-
final JdbcDatabase database = getDatabaseFromConfig(config);
56+
database = getDatabaseFromConfig(config);
5857
database.execute(connection -> {
5958
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
6059
connection.createStatement().execute("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
6160
});
61+
}
62+
63+
@AfterEach
64+
void close() throws Exception {
6265
database.close();
6366
}
6467

@@ -69,7 +72,7 @@ static void cleanUp() {
6972

7073
@Test
7174
void testBufferedResultQuery() throws SQLException {
72-
final List<JsonNode> actual = getDatabaseFromConfig(config).bufferedResultSetQuery(
75+
final List<JsonNode> actual = database.bufferedResultSetQuery(
7376
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
7477
sourceOperations::rowToJson);
7578

@@ -78,7 +81,7 @@ void testBufferedResultQuery() throws SQLException {
7881

7982
@Test
8083
void testResultSetQuery() throws SQLException {
81-
final Stream<JsonNode> actual = getDatabaseFromConfig(config).resultSetQuery(
84+
final Stream<JsonNode> actual = database.resultSetQuery(
8285
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
8386
sourceOperations::rowToJson);
8487
final List<JsonNode> actualAsList = actual.collect(Collectors.toList());
@@ -89,7 +92,7 @@ void testResultSetQuery() throws SQLException {
8992

9093
@Test
9194
void testQuery() throws SQLException {
92-
final Stream<JsonNode> actual = getDatabaseFromConfig(config).query(
95+
final Stream<JsonNode> actual = database.query(
9396
connection -> connection.prepareStatement("SELECT * FROM id_and_name;"),
9497
sourceOperations::rowToJson);
9598

airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/JdbcSqlOperations.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,11 @@ protected JdbcSqlOperations(final DataAdapter dataAdapter) {
4545
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
4646
if (!isSchemaExists(database, schemaName)) {
4747
AirbyteSentry.executeWithTracing("CreateSchema",
48-
() -> database.execute(createSchemaQuery(schemaName)),
48+
() -> database.execute(String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName)),
4949
Map.of("schema", schemaName));
5050
}
5151
}
5252

53-
private String createSchemaQuery(final String schemaName) {
54-
return String.format("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaName);
55-
}
56-
5753
@Override
5854
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException {
5955
AirbyteSentry.executeWithTracing("CreateTableIfNotExists",

airbyte-integrations/connectors/destination-snowflake/Dockerfile

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
1818

1919
RUN tar xf ${APPLICATION}.tar --strip-components=1
2020

21-
ENV APPLICATION_VERSION 0.4.11
21+
ENV APPLICATION_VERSION 0.4.12
2222
ENV ENABLE_SENTRY true
2323

24-
LABEL io.airbyte.version=0.4.11
24+
LABEL io.airbyte.version=0.4.12
2525
LABEL io.airbyte.name=airbyte/destination-snowflake

0 commit comments

Comments
 (0)