diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java index 659baf8b987cc..e25bd240e2a05 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java @@ -186,9 +186,7 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username, final BasicDataSource connectionPool = createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties)); - final JdbcDatabase defaultJdbcDatabase = - createJdbcDatabase(username, password, jdbcConnectionString, driverClassName, connectionProperties, sourceOperations); - return new StreamingJdbcDatabase(connectionPool, defaultJdbcDatabase, jdbcStreamingQuery); + return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery); } private static BasicDataSource createBasicDataSource(final String username, diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/CloseableConnectionSupplier.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/CloseableConnectionSupplier.java deleted file mode 100644 index baeb68ef1569f..0000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/CloseableConnectionSupplier.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.sql.Connection; -import java.sql.SQLException; - -public interface CloseableConnectionSupplier extends AutoCloseable { - - Connection getConnection() throws SQLException; - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DataSourceConnectionSupplier.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DataSourceConnectionSupplier.java deleted file mode 100644 index f1f0b92db320d..0000000000000 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DataSourceConnectionSupplier.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright (c) 2021 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.db.jdbc; - -import java.io.Closeable; -import java.sql.Connection; -import java.sql.SQLException; -import javax.sql.DataSource; - -public class DataSourceConnectionSupplier implements CloseableConnectionSupplier { - - private final DataSource dataSource; - - public DataSourceConnectionSupplier(final DataSource dataSource) { - this.dataSource = dataSource; - } - - @Override - public Connection getConnection() throws SQLException { - return dataSource.getConnection(); - } - - @Override - public void close() throws Exception { - // Just a safety in case we are using a datasource implementation that requires closing. - // BasicDataSource from apache does since it also provides a pooling mechanism to reuse connections. - - if (dataSource instanceof AutoCloseable) { - ((AutoCloseable) dataSource).close(); - } - if (dataSource instanceof Closeable) { - ((Closeable) dataSource).close(); - } - } - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index 613679cb350a1..1ef8aebca0489 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -8,6 +8,7 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.db.JdbcCompatibleSourceOperations; +import java.io.Closeable; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; @@ -27,24 +28,20 @@ public class DefaultJdbcDatabase extends JdbcDatabase { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJdbcDatabase.class); - private final CloseableConnectionSupplier connectionSupplier; + protected final DataSource dataSource; public DefaultJdbcDatabase(final DataSource dataSource) { - this(new DataSourceConnectionSupplier(dataSource), JdbcUtils.getDefaultSourceOperations()); + this(dataSource, JdbcUtils.getDefaultSourceOperations()); } public DefaultJdbcDatabase(final DataSource dataSource, final JdbcCompatibleSourceOperations sourceOperations) { - this(new DataSourceConnectionSupplier(dataSource), sourceOperations); - } - - public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier, final JdbcCompatibleSourceOperations sourceOperations) { super(sourceOperations); - this.connectionSupplier = connectionSupplier; + this.dataSource = dataSource; } @Override public void execute(final CheckedConsumer query) throws SQLException { - try (final Connection connection = connectionSupplier.getConnection()) { + try (final Connection connection = dataSource.getConnection()) { query.accept(connection); } } @@ -53,7 +50,7 @@ public void execute(final CheckedConsumer query) throw public List bufferedResultSetQuery(final CheckedFunction query, final CheckedFunction recordTransform) throws SQLException { - try (final Connection connection = connectionSupplier.getConnection(); + try (final Connection connection = dataSource.getConnection(); final Stream results = toStream(query.apply(connection), recordTransform)) { return results.collect(Collectors.toList()); } @@ -64,7 +61,7 @@ public List bufferedResultSetQuery(final CheckedFunction Stream resultSetQuery(final CheckedFunction query, final CheckedFunction recordTransform) throws SQLException { - final Connection connection = connectionSupplier.getConnection(); + final Connection connection = dataSource.getConnection(); return toStream(query.apply(connection), recordTransform) .onClose(() -> { try { @@ -77,7 +74,7 @@ public Stream resultSetQuery(final CheckedFunction Stream query(final CheckedFunction statementCreator, final CheckedFunction recordTransform) throws SQLException { - final Connection connection = connectionSupplier.getConnection(); + final Connection connection = dataSource.getConnection(); return toStream(statementCreator.apply(connection).executeQuery(), recordTransform) .onClose(() -> { try { @@ -116,7 +113,12 @@ public Stream query(final CheckedFunction { - - T query(Connection connection) throws SQLException; - -} diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java index 095e6eaace39b..d9486fe33f160 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcStreamingQueryConfiguration.java @@ -9,6 +9,7 @@ import java.sql.PreparedStatement; import java.sql.SQLException; +@FunctionalInterface public interface JdbcStreamingQueryConfiguration extends CheckedBiConsumer { } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java index 6bfa56887cef9..90df64f6e716f 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java @@ -5,15 +5,12 @@ package io.airbyte.db.jdbc; import com.google.errorprone.annotations.MustBeClosed; -import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.db.JdbcCompatibleSourceOperations; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; import java.util.stream.Stream; import javax.sql.DataSource; @@ -22,53 +19,17 @@ * allows the developer to specify the correct configuration in order for a * {@link PreparedStatement} to execute as in a streaming / chunked manner. */ -public class StreamingJdbcDatabase extends JdbcDatabase { +public class StreamingJdbcDatabase extends DefaultJdbcDatabase { - private final DataSource dataSource; - private final JdbcDatabase database; private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration; public StreamingJdbcDatabase(final DataSource dataSource, - final JdbcDatabase database, + final JdbcCompatibleSourceOperations sourceOperations, final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) { - this(dataSource, database, jdbcStreamingQueryConfiguration, database.sourceOperations); - } - - public StreamingJdbcDatabase(final DataSource dataSource, - final JdbcDatabase database, - final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration, - final JdbcCompatibleSourceOperations sourceOperations) { - super(sourceOperations); - this.dataSource = dataSource; - this.database = database; + super(dataSource, sourceOperations); this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration; } - @Override - public DatabaseMetaData getMetaData() throws SQLException { - return database.getMetaData(); - } - - @Override - public void execute(final CheckedConsumer query) throws SQLException { - database.execute(query); - } - - @Override - public List bufferedResultSetQuery(final CheckedFunction query, - final CheckedFunction recordTransform) - throws SQLException { - return database.bufferedResultSetQuery(query, recordTransform); - } - - @Override - @MustBeClosed - public Stream resultSetQuery(final CheckedFunction query, - final CheckedFunction recordTransform) - throws SQLException { - return database.resultSetQuery(query, recordTransform); - } - /** * Assuming that the {@link JdbcStreamingQueryConfiguration} is configured correctly for the JDBC * driver being used, this method will return data in streaming / chunked fashion. Review the @@ -109,9 +70,4 @@ public Stream query(final CheckedFunction { connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); @@ -91,45 +87,6 @@ static void cleanUp() { PSQL_DB.close(); } - @SuppressWarnings("unchecked") - @Test - void testExecute() throws SQLException { - final CheckedConsumer queryExecutor = mock(CheckedConsumer.class); - doNothing().when(defaultJdbcDatabase).execute(queryExecutor); - - streamingJdbcDatabase.execute(queryExecutor); - - verify(defaultJdbcDatabase).execute(queryExecutor); - } - - @SuppressWarnings("unchecked") - @Test - void testBufferedResultQuery() throws SQLException { - doReturn(RECORDS_AS_JSON).when(defaultJdbcDatabase).bufferedResultSetQuery(any(), any()); - - final List actual = streamingJdbcDatabase.bufferedResultSetQuery( - connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"), - sourceOperations::rowToJson); - - assertEquals(RECORDS_AS_JSON, actual); - verify(defaultJdbcDatabase).bufferedResultSetQuery(any(), any()); - } - - @SuppressWarnings("unchecked") - @Test - void testResultSetQuery() throws SQLException { - doReturn(RECORDS_AS_JSON.stream()).when(defaultJdbcDatabase).resultSetQuery(any(), any()); - - final Stream actual = streamingJdbcDatabase.resultSetQuery( - connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"), - sourceOperations::rowToJson); - final List actualAsList = actual.collect(Collectors.toList()); - actual.close(); - - assertEquals(RECORDS_AS_JSON, actualAsList); - verify(defaultJdbcDatabase).resultSetQuery(any(), any()); - } - @Test void testQuery() throws SQLException { // grab references to connection and prepared statement so we can verify the streaming config is diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index ee6b274a11675..359b843a2a1d7 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -7,10 +7,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.db.Databases; -import io.airbyte.db.jdbc.DataSourceConnectionSupplier; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.JdbcUtils; import java.time.Duration; import java.util.Map; import javax.sql.DataSource; @@ -57,7 +55,7 @@ private static DataSource createDataSource(final JsonNode config) { public static JdbcDatabase getDatabase(final JsonNode config) { final DataSource dataSource = createDataSource(config); - return new DefaultJdbcDatabase(new DataSourceConnectionSupplier(dataSource), JdbcUtils.getDefaultSourceOperations()); + return new DefaultJdbcDatabase(dataSource); } } diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java index f0d9a84b67261..231e6a03b8dc4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/CdcMySqlSourceDatatypeTest.java @@ -245,7 +245,7 @@ protected void initTests() { TestDataHolder.builder() .sourceType("date") .fullSourceDataType("date not null") - .airbyteType(JsonSchemaPrimitive.STRING) + .airbyteType(JsonSchemaType.STRING) .addInsertValues("'0000-00-00'") .addExpectedValues("1970-01-01T00:00:00Z") .build()); @@ -263,7 +263,7 @@ protected void initTests() { TestDataHolder.builder() .sourceType("datetime") .fullSourceDataType("datetime not null") - .airbyteType(JsonSchemaPrimitive.STRING) + .airbyteType(JsonSchemaType.STRING) .addInsertValues("'0000-00-00 00:00:00'") .addExpectedValues("1970-01-01T00:00:00Z") .build()); @@ -281,7 +281,7 @@ protected void initTests() { TestDataHolder.builder() .sourceType("timestamp") .fullSourceDataType("timestamp not null") - .airbyteType(JsonSchemaPrimitive.STRING) + .airbyteType(JsonSchemaType.STRING) .addInsertValues("'0000-00-00 00:00:00.000000'") .addExpectedValues("1970-01-01T00:00:00Z") .build());