Skip to content

Rename jdbc db methods that require manual closure #11300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ public abstract class SqlDatabase extends AbstractDatabase {

public abstract void execute(String sql) throws Exception;

public abstract Stream<JsonNode> query(String sql, String... params) throws Exception;
public abstract Stream<JsonNode> unsafeQuery(String sql, String... params) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public Stream<JsonNode> query(final String sql, final QueryParameterValue... par
}

@Override
public Stream<JsonNode> query(final String sql, final String... params) throws Exception {
public Stream<JsonNode> unsafeQuery(final String sql, final String... params) throws Exception {
final List<QueryParameterValue> parameterValueList;
if (params == null)
parameterValueList = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ public <T> List<T> bufferedResultSetQuery(final CheckedFunction<Connection, Resu
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
try (final Connection connection = dataSource.getConnection();
final Stream<T> results = toStream(query.apply(connection), recordTransform)) {
final Stream<T> results = toUnsafeStream(query.apply(connection), recordTransform)) {
return results.collect(Collectors.toList());
}
}

@Override
@MustBeClosed
public <T> Stream<T> resultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
public <T> Stream<T> unsafeResultSetQuery(final CheckedFunction<Connection, ResultSet, SQLException> query,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
final Connection connection = dataSource.getConnection();
return toStream(query.apply(connection), recordTransform)
return toUnsafeStream(query.apply(connection), recordTransform)
.onClose(() -> {
try {
connection.close();
Expand Down Expand Up @@ -96,11 +96,11 @@ public DatabaseMetaData getMetaData() throws SQLException {
*/
@Override
@MustBeClosed
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
final Connection connection = dataSource.getConnection();
return toStream(statementCreator.apply(connection).executeQuery(), recordTransform)
return toUnsafeStream(statementCreator.apply(connection).executeQuery(), recordTransform)
.onClose(() -> {
try {
LOGGER.info("closing connection");
Expand Down
34 changes: 20 additions & 14 deletions airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ public void executeWithinTransaction(final List<String> queries) throws SQLExcep
}

/**
* Map records returned in a result set.
* Map records returned in a result set. It is an "unsafe" stream because the stream must be
* manually closed. Otherwise, there will be a database connection leak.
*
* @param resultSet the result set
* @param mapper function to make each record of the result set
* @param <T> type that each record will be mapped to
* @return stream of records that the result set is mapped to.
*/
@MustBeClosed
protected static <T> Stream<T> toStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
protected static <T> Stream<T> toUnsafeStream(final ResultSet resultSet, final CheckedFunction<ResultSet, T, SQLException> mapper) {
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {

@Override
Expand Down Expand Up @@ -108,8 +109,8 @@ public abstract <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, R
* Use a connection to create a {@link ResultSet} and map it into a stream. You CANNOT assume that
* data will be returned from this method before the entire {@link ResultSet} is buffered in memory.
* Review the implementation of the database's JDBC driver or use the StreamingJdbcDriver if you
* need this guarantee. The caller should close the returned stream to release the database
* connection.
* need this guarantee. It is "unsafe" because the caller should close the returned stream to
* release the database connection. Otherwise, there will be a connection leak.
*
* @param query execute a query using a {@link Connection} to get a {@link ResultSet}.
* @param recordTransform transform each record of that result set into the desired type. do NOT
Expand All @@ -120,16 +121,17 @@ public abstract <T> List<T> bufferedResultSetQuery(CheckedFunction<Connection, R
* @throws SQLException SQL related exceptions.
*/
@MustBeClosed
public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
public abstract <T> Stream<T> unsafeResultSetQuery(CheckedFunction<Connection, ResultSet, SQLException> query,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

/**
* Use a connection to create a {@link PreparedStatement} and map it into a stream. You CANNOT
* assume that data will be returned from this method before the entire {@link ResultSet} is
* buffered in memory. Review the implementation of the database's JDBC driver or use the
* StreamingJdbcDriver if you need this guarantee. The caller should close the returned stream to
* release the database connection.
* StreamingJdbcDriver if you need this guarantee. It is "unsafe" because the caller should close
* the returned stream to release the database connection. Otherwise, there will be a connection
* leak.
*
* @param statementCreator create a {@link PreparedStatement} from a {@link Connection}.
* @param recordTransform transform each record of that result set into the desired type. do NOT
Expand All @@ -140,12 +142,12 @@ public abstract <T> Stream<T> resultSetQuery(CheckedFunction<Connection, ResultS
* @throws SQLException SQL related exceptions.
*/
@MustBeClosed
public abstract <T> Stream<T> query(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
public abstract <T> Stream<T> unsafeQuery(CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException;

public int queryInt(final String sql, final String... params) throws SQLException {
try (final Stream<Integer> q = query(c -> {
try (final Stream<Integer> q = unsafeQuery(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
Expand All @@ -159,10 +161,14 @@ public int queryInt(final String sql, final String... params) throws SQLExceptio
}
}

/**
* It is "unsafe" because the caller must manually close the returned stream. Otherwise, there will
* be a database connection leak.
*/
@MustBeClosed
@Override
public Stream<JsonNode> query(final String sql, final String... params) throws SQLException {
return query(connection -> {
public Stream<JsonNode> unsafeQuery(final String sql, final String... params) throws SQLException {
return unsafeQuery(connection -> {
final PreparedStatement statement = connection.prepareStatement(sql);
int i = 1;
for (final String param : params) {
Expand All @@ -174,7 +180,7 @@ public Stream<JsonNode> query(final String sql, final String... params) throws S
}

public ResultSetMetaData queryMetadata(final String sql, final String... params) throws SQLException {
try (final Stream<ResultSetMetaData> q = query(c -> {
try (final Stream<ResultSetMetaData> q = unsafeQuery(c -> {
PreparedStatement statement = c.prepareStatement(sql);
int i = 1;
for (String param : params) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ public StreamingJdbcDatabase(final DataSource dataSource,
*/
@Override
@MustBeClosed
public <T> Stream<T> query(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator,
final CheckedFunction<ResultSet, T, SQLException> recordTransform)
throws SQLException {
try {
final Connection connection = dataSource.getConnection();
final PreparedStatement ps = statementCreator.apply(connection);
// allow configuration of connection and prepared statement to make streaming possible.
jdbcStreamingQueryConfiguration.accept(connection, ps);
return toStream(ps.executeQuery(), recordTransform)
return toUnsafeStream(ps.executeQuery(), recordTransform)
.onClose(() -> {
try {
connection.setAutoCommit(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testBufferedResultQuery() throws SQLException {

@Test
void testResultSetQuery() throws SQLException {
final Stream<JsonNode> actual = database.resultSetQuery(
final Stream<JsonNode> actual = database.unsafeResultSetQuery(
connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);
final List<JsonNode> actualAsList = actual.collect(Collectors.toList());
Expand All @@ -92,7 +92,7 @@ void testResultSetQuery() throws SQLException {

@Test
void testQuery() throws SQLException {
final Stream<JsonNode> actual = database.query(
final Stream<JsonNode> actual = database.unsafeQuery(
connection -> connection.prepareStatement("SELECT * FROM id_and_name;"),
sourceOperations::rowToJson);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void testRowToJson() throws SQLException {
void testToStream() throws SQLException {
try (final Connection connection = dataSource.getConnection()) {
final ResultSet rs = connection.createStatement().executeQuery("SELECT * FROM id_and_name;");
final List<JsonNode> actual = JdbcDatabase.toStream(rs, sourceOperations::rowToJson).collect(Collectors.toList());
final List<JsonNode> actual = JdbcDatabase.toUnsafeStream(rs, sourceOperations::rowToJson).collect(Collectors.toList());
assertEquals(RECORDS_AS_JSON, actual);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ void testQuery() throws SQLException {
// invoked.
final AtomicReference<Connection> connection1 = new AtomicReference<>();
final AtomicReference<PreparedStatement> ps1 = new AtomicReference<>();
final Stream<JsonNode> actual = streamingJdbcDatabase.query(
final Stream<JsonNode> actual = streamingJdbcDatabase.unsafeQuery(
connection -> {
connection1.set(connection);
final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
final JdbcDatabase jdbcDB = getDatabase(getConfig());
return jdbcDB.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
final JdbcDatabase jdbcDB = getDatabase(getConfig());
return jdbcDB.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St
ClickhouseDestination.HOST_KEY,
ClickhouseDestination.PORT_KEY,
(CheckedFunction<JsonNode, List<JsonNode>, Exception>) mangledConfig -> getDatabase(mangledConfig)
.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.collect(Collectors.toList()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ VersionCompatibility isCompatibleVersion(final JdbcDatabase database) throws SQL
}

private Semver getVersion(final JdbcDatabase database) throws SQLException {
final List<String> value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SELECT version()"),
final List<String> value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SELECT version()"),
resultSet -> resultSet.getString("version()")).collect(Collectors.toList());
Matcher matcher = VERSION_PATTERN.matcher(value.get(0));
if (matcher.find()) {
Expand All @@ -123,7 +123,7 @@ void verifyLocalFileEnabled(final JdbcDatabase database) throws SQLException {

private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException {
final List<String> value =
database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"),
database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"),
resultSet -> resultSet.getString("Value")).collect(Collectors.toList());

return value.get(0).equalsIgnoreCase("on");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ protected List<JsonNode> retrieveRecords(TestDestinationEnv testEnv,

private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
JdbcDatabase database = getDatabase(getConfig());
return database.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
return database.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private void tryEnableLocalFile(final JdbcDatabase database) throws SQLException
}

private double getVersion(final JdbcDatabase database) throws SQLException {
final List<String> value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"),
final List<String> value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("select version()"),
resultSet -> resultSet.getString("version()")).collect(Collectors.toList());
return Double.parseDouble(value.get(0).substring(0, 3));
}
Expand All @@ -117,7 +117,7 @@ public boolean isSchemaRequired() {

private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException {
final List<String> value =
database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"),
database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"),
resultSet -> resultSet.getString("Value")).collect(Collectors.toList());

return value.get(0).equalsIgnoreCase("on");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void testEncryption() throws SQLException {

final String network_service_banner =
"select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)";
final List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());
final List<JsonNode> collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList());

assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(),
equals("Oracle Advanced Security: " + algorithm + " encryption"));
Expand All @@ -208,7 +208,7 @@ public void testCheckProtocol() throws SQLException {
+ algorithm + " )"));

final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual";
final List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());
final List<JsonNode> collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList());

assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void testEncryption() throws SQLException {

final String network_service_banner =
"select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)";
final List<JsonNode> collect = database.query(network_service_banner).toList();
final List<JsonNode> collect = database.unsafeQuery(network_service_banner).toList();

assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(),
equals("Oracle Advanced Security: " + algorithm + " encryption"));
Expand Down Expand Up @@ -74,7 +74,7 @@ public void testCheckProtocol() throws SQLException {
getAdditionalProperties(algorithm));

final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual";
final List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());
final List<JsonNode> collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList());

assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testNoneEncryption() throws SQLException {

final String network_service_banner =
"select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)";
final List<JsonNode> collect = database.query(network_service_banner).collect(Collectors.toList());
final List<JsonNode> collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList());

assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText()
.contains("Oracle Advanced Security: encryption"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private boolean checkStageObjectExists(final JdbcDatabase database, final String
final String query = getListQuery(stageName, stagingPath, filename);
LOGGER.debug("Executing query: {}", query);
final boolean result;
try (final Stream<JsonNode> stream = database.query(query)) {
try (final Stream<JsonNode> stream = database.unsafeQuery(query)) {
result = stream.findAny().isPresent();
}
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN

@Override
public boolean isSchemaExists(final JdbcDatabase database, final String outputSchema) throws Exception {
try (final Stream<JsonNode> results = database.query(SHOW_SCHEMAS)) {
try (final Stream<JsonNode> results = database.unsafeQuery(SHOW_SCHEMAS)) {
return results.map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void createTableQuery() {
@Test
void isSchemaExists() throws Exception {
snowflakeSqlOperations.isSchemaExists(db, SCHEMA_NAME);
verify(db, times(1)).query(anyString());
verify(db, times(1)).unsafeQuery(anyString());
}

@Test
Expand Down
Loading