Skip to content

Clean up jdbc databases #10371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airbyte-db/lib/src/main/java/io/airbyte/db/Databases.java
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,7 @@ public static JdbcDatabase createStreamingJdbcDatabase(final String username,
final BasicDataSource connectionPool =
createBasicDataSource(username, password, jdbcConnectionString, driverClassName, Optional.ofNullable(connectionProperties));

final JdbcDatabase defaultJdbcDatabase =
createJdbcDatabase(username, password, jdbcConnectionString, driverClassName, connectionProperties, sourceOperations);
return new StreamingJdbcDatabase(connectionPool, defaultJdbcDatabase, jdbcStreamingQuery);
return new StreamingJdbcDatabase(connectionPool, sourceOperations, jdbcStreamingQuery);
}

private static BasicDataSource createBasicDataSource(final String username,
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.io.Closeable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
Expand All @@ -27,24 +28,20 @@ public class DefaultJdbcDatabase extends JdbcDatabase {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultJdbcDatabase.class);

private final CloseableConnectionSupplier connectionSupplier;
protected final DataSource dataSource;

public DefaultJdbcDatabase(final DataSource dataSource) {
this(new DataSourceConnectionSupplier(dataSource), JdbcUtils.getDefaultSourceOperations());
this(dataSource, JdbcUtils.getDefaultSourceOperations());
}

public DefaultJdbcDatabase(final DataSource dataSource, final JdbcCompatibleSourceOperations<?> sourceOperations) {
this(new DataSourceConnectionSupplier(dataSource), sourceOperations);
}

public DefaultJdbcDatabase(final CloseableConnectionSupplier connectionSupplier, final JdbcCompatibleSourceOperations<?> sourceOperations) {
super(sourceOperations);
this.connectionSupplier = connectionSupplier;
this.dataSource = dataSource;
}

@Override
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
try (final Connection connection = connectionSupplier.getConnection()) {
try (final Connection connection = dataSource.getConnection()) {
query.accept(connection);
}
}
Expand All @@ -53,7 +50,7 @@ public void execute(final CheckedConsumer<Connection, SQLException> query) throw
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
try (final Connection connection = connectionSupplier.getConnection();
try (final Connection connection = dataSource.getConnection();
final Stream<T> results = toStream(query.apply(connection), recordTransform)) {
return results.collect(Collectors.toList());
}
Expand All @@ -64,7 +61,7 @@ public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, Resu
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
final Connection connection = connectionSupplier.getConnection();
final Connection connection = dataSource.getConnection();
return toStream(query.apply(connection), recordTransform)
.onClose(() -> {
try {
Expand All @@ -77,7 +74,7 @@ public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet,

@Override
public DatabaseMetaData getMetaData() throws SQLException {
final Connection conn = connectionSupplier.getConnection();
final Connection conn = dataSource.getConnection();
final DatabaseMetaData metaData = conn.getMetaData();
conn.close();
return metaData;
Expand All @@ -102,7 +99,7 @@ public DatabaseMetaData getMetaData() throws SQLException {
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
final Connection connection = connectionSupplier.getConnection();
final Connection connection = dataSource.getConnection();
return toStream(statementCreator.apply(connection).executeQuery(), recordTransform)
.onClose(() -> {
try {
Expand All @@ -116,7 +113,12 @@ public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement,

@Override
public void close() throws Exception {
connectionSupplier.close();
if (dataSource instanceof AutoCloseable autoCloseable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nitpick: it would be nice to keep this comment https://github.com/airbytehq/airbyte/pull/10342/files#diff-c7c618583584a27fb3b21cef812ca82b6d077d8bc779cf9a401bfc7658bb8dbaL145-L146

also I didn't know this instanceof T var syntax existed! it's pretty cool 🚛

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will add it in a follow-up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in #10444.

autoCloseable.close();
}
if (dataSource instanceof Closeable closeable) {
closeable.close();
}
}

}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.sql.PreparedStatement;
import java.sql.SQLException;

@FunctionalInterface
public interface JdbcStreamingQueryConfiguration extends CheckedBiConsumer<Connection, PreparedStatement, SQLException> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
package io.airbyte.db.jdbc;

import com.google.errorprone.annotations.MustBeClosed;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Stream;
import javax.sql.DataSource;

Expand All @@ -22,53 +19,17 @@
* allows the developer to specify the correct configuration in order for a
* {@link PreparedStatement} to execute as in a streaming / chunked manner.
*/
public class StreamingJdbcDatabase extends JdbcDatabase {
public class StreamingJdbcDatabase extends DefaultJdbcDatabase {

private final DataSource dataSource;
private final JdbcDatabase database;
private final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration;

public StreamingJdbcDatabase(final DataSource dataSource,
final JdbcDatabase database,
final JdbcCompatibleSourceOperations<?> sourceOperations,
final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration) {
this(dataSource, database, jdbcStreamingQueryConfiguration, database.sourceOperations);
}

public StreamingJdbcDatabase(final DataSource dataSource,
final JdbcDatabase database,
final JdbcStreamingQueryConfiguration jdbcStreamingQueryConfiguration,
final JdbcCompatibleSourceOperations<?> sourceOperations) {
super(sourceOperations);
this.dataSource = dataSource;
this.database = database;
super(dataSource, sourceOperations);
this.jdbcStreamingQueryConfiguration = jdbcStreamingQueryConfiguration;
}

@Override
public DatabaseMetaData getMetaData() throws SQLException {
return database.getMetaData();
}

@Override
public void execute(final CheckedConsumer<Connection, SQLException> query) throws SQLException {
database.execute(query);
}

@Override
public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.bufferedResultSetQuery(query, recordTransform);
}

@Override
@MustBeClosed
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
return database.resultSetQuery(query, recordTransform);
}

/**
* Assuming that the {@link JdbcStreamingQueryConfiguration} is configured correctly for the JDBC
* driver being used, this method will return data in streaming / chunked fashion. Review the
Expand Down Expand Up @@ -109,9 +70,4 @@ public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement,
}
}

@Override
public void close() throws Exception {
database.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@
package io.airbyte.db.jdbc;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
Expand Down Expand Up @@ -78,7 +74,7 @@ void setup() throws Exception {
config.get("database").asText()));

defaultJdbcDatabase = spy(new DefaultJdbcDatabase(connectionPool));
streamingJdbcDatabase = new StreamingJdbcDatabase(connectionPool, defaultJdbcDatabase, jdbcStreamingQueryConfiguration);
streamingJdbcDatabase = new StreamingJdbcDatabase(connectionPool, JdbcUtils.getDefaultSourceOperations(), jdbcStreamingQueryConfiguration);

defaultJdbcDatabase.execute(connection -> {
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
Expand All @@ -91,45 +87,6 @@ static void cleanUp() {
PSQL_DB.close();
}

@SuppressWarnings("unchecked")
@Test
void testExecute() throws SQLException {
final CheckedConsumer<Connection, SQLException> queryExecutor = mock(CheckedConsumer.class);
doNothing().when(defaultJdbcDatabase).execute(queryExecutor);

streamingJdbcDatabase.execute(queryExecutor);

verify(defaultJdbcDatabase).execute(queryExecutor);
}

@SuppressWarnings("unchecked")
@Test
void testBufferedResultQuery() throws SQLException {
doReturn(RECORDS_AS_JSON).when(defaultJdbcDatabase).bufferedResultSetQuery(any(), any());

final List<JsonNode> actual = streamingJdbcDatabase.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);

assertEquals(RECORDS_AS_JSON, actual);
verify(defaultJdbcDatabase).bufferedResultSetQuery(any(), any());
}

@SuppressWarnings("unchecked")
@Test
void testResultSetQuery() throws SQLException {
doReturn(RECORDS_AS_JSON.stream()).when(defaultJdbcDatabase).resultSetQuery(any(), any());

final Stream<JsonNode> actual = streamingJdbcDatabase.resultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);
final List<JsonNode> actualAsList = actual.collect(Collectors.toList());
actual.close();

assertEquals(RECORDS_AS_JSON, actualAsList);
verify(defaultJdbcDatabase).resultSetQuery(any(), any());
}

@Test
void testQuery() throws SQLException {
// grab references to connection and prepared statement so we can verify the streaming config is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.DataSourceConnectionSupplier;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import java.time.Duration;
import java.util.Map;
import javax.sql.DataSource;
Expand Down Expand Up @@ -57,7 +55,7 @@ private static DataSource createDataSource(final JsonNode config) {

public static JdbcDatabase getDatabase(final JsonNode config) {
final DataSource dataSource = createDataSource(config);
return new DefaultJdbcDatabase(new DataSourceConnectionSupplier(dataSource), JdbcUtils.getDefaultSourceOperations());
return new DefaultJdbcDatabase(dataSource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("date")
.fullSourceDataType("date not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand All @@ -263,7 +263,7 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("datetime")
.fullSourceDataType("datetime not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00 00:00:00'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand All @@ -281,7 +281,7 @@ protected void initTests() {
TestDataHolder.builder()
.sourceType("timestamp")
.fullSourceDataType("timestamp not null")
.airbyteType(JsonSchemaPrimitive.STRING)
.airbyteType(JsonSchemaType.STRING)
.addInsertValues("'0000-00-00 00:00:00.000000'")
.addExpectedValues("1970-01-01T00:00:00Z")
.build());
Expand Down