diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/SqlDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/SqlDatabase.java index 3c1d15e560954..9265ffeda0262 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/SqlDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/SqlDatabase.java @@ -11,6 +11,6 @@ public abstract class SqlDatabase extends AbstractDatabase { public abstract void execute(String sql) throws Exception; - public abstract Stream query(String sql, String... params) throws Exception; + public abstract Stream unsafeQuery(String sql, String... params) throws Exception; } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java index a6179564b4003..2726458489b3b 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java @@ -91,7 +91,7 @@ public Stream query(final String sql, final QueryParameterValue... par } @Override - public Stream query(final String sql, final String... params) throws Exception { + public Stream unsafeQuery(final String sql, final String... params) throws Exception { final List parameterValueList; if (params == null) parameterValueList = Collections.emptyList(); diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index b1280568d5f46..8ccf589717700 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -51,18 +51,18 @@ public List bufferedResultSetQuery(final CheckedFunction recordTransform) throws SQLException { try (final Connection connection = dataSource.getConnection(); - final Stream results = toStream(query.apply(connection), recordTransform)) { + final Stream results = toUnsafeStream(query.apply(connection), recordTransform)) { return results.collect(Collectors.toList()); } } @Override @MustBeClosed - public Stream resultSetQuery(final CheckedFunction query, - final CheckedFunction recordTransform) + public Stream unsafeResultSetQuery(final CheckedFunction query, + final CheckedFunction recordTransform) throws SQLException { final Connection connection = dataSource.getConnection(); - return toStream(query.apply(connection), recordTransform) + return toUnsafeStream(query.apply(connection), recordTransform) .onClose(() -> { try { connection.close(); @@ -96,11 +96,11 @@ public DatabaseMetaData getMetaData() throws SQLException { */ @Override @MustBeClosed - public Stream query(final CheckedFunction statementCreator, - final CheckedFunction recordTransform) + public Stream unsafeQuery(final CheckedFunction statementCreator, + final CheckedFunction recordTransform) throws SQLException { final Connection connection = dataSource.getConnection(); - return toStream(statementCreator.apply(connection).executeQuery(), recordTransform) + return toUnsafeStream(statementCreator.apply(connection).executeQuery(), recordTransform) .onClose(() -> { try { LOGGER.info("closing connection"); diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java index 0dd4f183c0b6b..000023122d2e9 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java @@ -59,7 +59,8 @@ public void executeWithinTransaction(final List queries) throws SQLExcep } /** - * Map records returned in a result set. + * Map records returned in a result set. It is an "unsafe" stream because the stream must be + * manually closed. Otherwise, there will be a database connection leak. * * @param resultSet the result set * @param mapper function to make each record of the result set @@ -67,7 +68,7 @@ public void executeWithinTransaction(final List queries) throws SQLExcep * @return stream of records that the result set is mapped to. */ @MustBeClosed - protected static Stream toStream(final ResultSet resultSet, final CheckedFunction mapper) { + protected static Stream toUnsafeStream(final ResultSet resultSet, final CheckedFunction mapper) { return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) { @Override @@ -108,8 +109,8 @@ public abstract List bufferedResultSetQuery(CheckedFunction List bufferedResultSetQuery(CheckedFunction Stream resultSetQuery(CheckedFunction query, - CheckedFunction recordTransform) + public abstract Stream unsafeResultSetQuery(CheckedFunction query, + CheckedFunction recordTransform) throws SQLException; /** * Use a connection to create a {@link PreparedStatement} and map it into a stream. You CANNOT * assume that data will be returned from this method before the entire {@link ResultSet} is * buffered in memory. Review the implementation of the database's JDBC driver or use the - * StreamingJdbcDriver if you need this guarantee. The caller should close the returned stream to - * release the database connection. + * StreamingJdbcDriver if you need this guarantee. It is "unsafe" because the caller should close + * the returned stream to release the database connection. Otherwise, there will be a connection + * leak. * * @param statementCreator create a {@link PreparedStatement} from a {@link Connection}. * @param recordTransform transform each record of that result set into the desired type. do NOT @@ -140,12 +142,12 @@ public abstract Stream resultSetQuery(CheckedFunction Stream query(CheckedFunction statementCreator, - CheckedFunction recordTransform) + public abstract Stream unsafeQuery(CheckedFunction statementCreator, + CheckedFunction recordTransform) throws SQLException; public int queryInt(final String sql, final String... params) throws SQLException { - try (final Stream q = query(c -> { + try (final Stream q = unsafeQuery(c -> { PreparedStatement statement = c.prepareStatement(sql); int i = 1; for (String param : params) { @@ -159,10 +161,14 @@ public int queryInt(final String sql, final String... params) throws SQLExceptio } } + /** + * It is "unsafe" because the caller must manually close the returned stream. Otherwise, there will + * be a database connection leak. + */ @MustBeClosed @Override - public Stream query(final String sql, final String... params) throws SQLException { - return query(connection -> { + public Stream unsafeQuery(final String sql, final String... params) throws SQLException { + return unsafeQuery(connection -> { final PreparedStatement statement = connection.prepareStatement(sql); int i = 1; for (final String param : params) { @@ -174,7 +180,7 @@ public Stream query(final String sql, final String... params) throws S } public ResultSetMetaData queryMetadata(final String sql, final String... params) throws SQLException { - try (final Stream q = query(c -> { + try (final Stream q = unsafeQuery(c -> { PreparedStatement statement = c.prepareStatement(sql); int i = 1; for (String param : params) { diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java index 90df64f6e716f..338fec362d830 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java @@ -48,15 +48,15 @@ public StreamingJdbcDatabase(final DataSource dataSource, */ @Override @MustBeClosed - public Stream query(final CheckedFunction statementCreator, - final CheckedFunction recordTransform) + public Stream unsafeQuery(final CheckedFunction statementCreator, + final CheckedFunction recordTransform) throws SQLException { try { final Connection connection = dataSource.getConnection(); final PreparedStatement ps = statementCreator.apply(connection); // allow configuration of connection and prepared statement to make streaming possible. jdbcStreamingQueryConfiguration.accept(connection, ps); - return toStream(ps.executeQuery(), recordTransform) + return toUnsafeStream(ps.executeQuery(), recordTransform) .onClose(() -> { try { connection.setAutoCommit(true); diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java index d6c318a11b45b..43448e3d44d04 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java @@ -81,7 +81,7 @@ void testBufferedResultQuery() throws SQLException { @Test void testResultSetQuery() throws SQLException { - final Stream actual = database.resultSetQuery( + final Stream actual = database.unsafeResultSetQuery( connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"), sourceOperations::rowToJson); final List actualAsList = actual.collect(Collectors.toList()); @@ -92,7 +92,7 @@ void testResultSetQuery() throws SQLException { @Test void testQuery() throws SQLException { - final Stream actual = database.query( + final Stream actual = database.unsafeQuery( connection -> connection.prepareStatement("SELECT * FROM id_and_name;"), sourceOperations::rowToJson); diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java index 932636c196517..001bfa1d3a969 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java @@ -106,7 +106,7 @@ void testRowToJson() throws SQLException { void testToStream() throws SQLException { try (final Connection connection = dataSource.getConnection()) { final ResultSet rs = connection.createStatement().executeQuery("SELECT * FROM id_and_name;"); - final List actual = JdbcDatabase.toStream(rs, sourceOperations::rowToJson).collect(Collectors.toList()); + final List actual = JdbcDatabase.toUnsafeStream(rs, sourceOperations::rowToJson).collect(Collectors.toList()); assertEquals(RECORDS_AS_JSON, actual); } } diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java index ab0f507655776..f1b60896081bd 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java @@ -93,7 +93,7 @@ void testQuery() throws SQLException { // invoked. final AtomicReference connection1 = new AtomicReference<>(); final AtomicReference ps1 = new AtomicReference<>(); - final Stream actual = streamingJdbcDatabase.query( + final Stream actual = streamingJdbcDatabase.unsafeQuery( connection -> { connection1.set(connection); final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java index f59bfdcaaf5bf..d423eb2f9a6db 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java @@ -110,7 +110,7 @@ protected List retrieveRecords(TestDestinationEnv testEnv, private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - return jdbcDB.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java index cf25f5211d54c..d897950a23edd 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java @@ -101,7 +101,7 @@ protected List retrieveRecords(TestDestinationEnv testEnv, private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - return jdbcDB.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java index ed50b11027f19..dd6b640fdfb86 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java @@ -103,7 +103,7 @@ private List retrieveRecordsFromTable(final String tableName, final St ClickhouseDestination.HOST_KEY, ClickhouseDestination.PORT_KEY, (CheckedFunction, Exception>) mangledConfig -> getDatabase(mangledConfig) - .query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, + .unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .collect(Collectors.toList())); } diff --git a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java index 0e181b2d118a2..4460ca17e0851 100644 --- a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java +++ b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java @@ -103,7 +103,7 @@ VersionCompatibility isCompatibleVersion(final JdbcDatabase database) throws SQL } private Semver getVersion(final JdbcDatabase database) throws SQLException { - final List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SELECT version()"), + final List value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SELECT version()"), resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); Matcher matcher = VERSION_PATTERN.matcher(value.get(0)); if (matcher.find()) { @@ -123,7 +123,7 @@ void verifyLocalFileEnabled(final JdbcDatabase database) throws SQLException { private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException { final List value = - database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); return value.get(0).equalsIgnoreCase("on"); diff --git a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java index cdf4a94498fb3..6b37b1e5e93ab 100644 --- a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java @@ -82,7 +82,7 @@ protected List retrieveRecords(TestDestinationEnv testEnv, private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { JdbcDatabase database = getDatabase(getConfig()); - return database.query(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, + return database.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java index 250ad94c33f09..181fb8d8bf942 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java @@ -100,7 +100,7 @@ private void tryEnableLocalFile(final JdbcDatabase database) throws SQLException } private double getVersion(final JdbcDatabase database) throws SQLException { - final List value = database.resultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), + final List value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); return Double.parseDouble(value.get(0).substring(0, 3)); } @@ -117,7 +117,7 @@ public boolean isSchemaRequired() { private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException { final List value = - database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); return value.get(0).equalsIgnoreCase("on"); diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java index d8cdc1a31b716..b360964abcc4f 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java @@ -183,7 +183,7 @@ public void testEncryption() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(), equals("Oracle Advanced Security: " + algorithm + " encryption")); @@ -208,7 +208,7 @@ public void testCheckProtocol() throws SQLException { + algorithm + " )")); final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java index 4b8f108868b8c..8a65723e2a0c0 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java @@ -42,7 +42,7 @@ public void testEncryption() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).toList(); + final List collect = database.unsafeQuery(network_service_banner).toList(); assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(), equals("Oracle Advanced Security: " + algorithm + " encryption")); @@ -74,7 +74,7 @@ public void testCheckProtocol() throws SQLException { getAdditionalProperties(algorithm)); final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java index f072506204abe..f97bcaaec706d 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java @@ -175,7 +175,7 @@ public void testNoneEncryption() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText() .contains("Oracle Advanced Security: encryption")); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java index c8c144cceb921..fbb548025bdc9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java @@ -112,7 +112,7 @@ private boolean checkStageObjectExists(final JdbcDatabase database, final String final String query = getListQuery(stageName, stagingPath, filename); LOGGER.debug("Executing query: {}", query); final boolean result; - try (final Stream stream = database.query(query)) { + try (final Stream stream = database.unsafeQuery(query)) { result = stream.findAny().isPresent(); } return result; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index 9af2534e1d80f..ef4be90e93df1 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -36,7 +36,7 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN @Override public boolean isSchemaExists(final JdbcDatabase database, final String outputSchema) throws Exception { - try (final Stream results = database.query(SHOW_SCHEMAS)) { + try (final Stream results = database.unsafeQuery(SHOW_SCHEMAS)) { return results.map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java index b4de44c0aa0e9..bd0674bdbddf5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java @@ -44,7 +44,7 @@ void createTableQuery() { @Test void isSchemaExists() throws Exception { snowflakeSqlOperations.isSchemaExists(db, SCHEMA_NAME); - verify(db, times(1)).query(anyString()); + verify(db, times(1)).unsafeQuery(anyString()); } @Test diff --git a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java index 5cd626a81ec9b..aa255ccace20d 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java @@ -50,7 +50,7 @@ protected Map> discoverPrimaryKeys(final JdbcDatabase datab .getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), tableInfo -> { try { - return database.resultSetQuery(connection -> { + return database.unsafeResultSetQuery(connection -> { final String sql = "SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1"; final PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, tableInfo.getNameSpace()); diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index a7e8c9b33247a..7e6d63de5878a 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -101,7 +101,7 @@ public AutoCloseableIterator read(final JsonNode config, @Override public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { return database - .query(getPrivileges(database), sourceOperations::rowToJson) + .unsafeQuery(getPrivileges(database), sourceOperations::rowToJson) .map(this::getPrivilegeDto) .collect(Collectors.toSet()); } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java index 0aa9572cb5a25..e3037d0b48979 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcDatabase.java @@ -54,21 +54,21 @@ public List bufferedResultSetQuery(final CheckedFunction Stream resultSetQuery(final CheckedFunction query, - final CheckedFunction recordTransform) + public Stream unsafeResultSetQuery(final CheckedFunction query, + final CheckedFunction recordTransform) throws SQLException { - return database.resultSetQuery(query, recordTransform); + return database.unsafeResultSetQuery(query, recordTransform); } @Override - public Stream query(final CheckedFunction statementCreator, - final CheckedFunction recordTransform) + public Stream unsafeQuery(final CheckedFunction statementCreator, + final CheckedFunction recordTransform) throws SQLException { - return database.query(statementCreator, recordTransform); + return database.unsafeQuery(statementCreator, recordTransform); } @Override - public Stream query(final String sql, final String... params) throws SQLException { + public Stream unsafeQuery(final String sql, final String... params) throws SQLException { return bufferedResultSetQuery(connection -> { final PreparedStatement statement = connection.prepareStatement(sql); int i = 1; diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java index c20433a3d1c99..8ec8eab83d0f5 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java @@ -90,7 +90,7 @@ public Set getExcludedInternalNameSpaces() { @Override public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { return database - .query(getPrivileges(), sourceOperations::rowToJson) + .unsafeQuery(getPrivileges(), sourceOperations::rowToJson) .map(this::getPrivilegeDto) .collect(Collectors.toSet()); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index c5f515e55cd4d..f85b4eebc0c55 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -262,7 +262,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { - final Stream stream = database.query( + final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index cbc62a3c2fe66..4fdd684a34890 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -94,7 +94,7 @@ public AutoCloseableIterator queryTableIncremental(JdbcDatabase databa LOGGER.info("Queueing query for table: {}", tableName); return AutoCloseableIterators.lazyIterator(() -> { try { - final Stream stream = database.query( + final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); @@ -244,7 +244,7 @@ public List> getCheckOperations(final J protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.query(connection -> { + final List queryResponse = database.unsafeQuery(connection -> { final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); @@ -267,7 +267,7 @@ protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase da protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.query(connection -> { + final List queryResponse = database.unsafeQuery(connection -> { final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; final PreparedStatement ps = connection.prepareStatement(sql); @@ -286,7 +286,7 @@ protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabas // todo: ensure this works for Azure managed SQL (since it uses different sql server agent) protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws SQLException { try { - final List queryResponse = database.query(connection -> { + final List queryResponse = database.unsafeQuery(connection -> { final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'"; final PreparedStatement ps = connection.prepareStatement(sql); LOGGER.info(String @@ -312,7 +312,7 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.query(connection -> { + final List queryResponse = database.unsafeQuery(connection -> { final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java index e4986b575dcf7..de02f827e0e6c 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java @@ -47,7 +47,7 @@ public String toString() { public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database) { try { - final List masterStatus = database.resultSetQuery( + final List masterStatus = database.unsafeResultSetQuery( connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), resultSet -> { final String file = resultSet.getString("File"); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java index e20c2fc5f7902..b2df3e9122591 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -75,7 +75,7 @@ public static List> getCheckOperations( private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) { try { - List binlogs = database.resultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"), + List binlogs = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"), resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList()); return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog)); @@ -97,7 +97,7 @@ private static Optional getBinlog(JsonNode offset) { private static CheckedConsumer getCheckOperation(String name, String value) { return database -> { - final List result = database.resultSetQuery(connection -> { + final List result = database.unsafeResultSetQuery(connection -> { final String sql = """ show variables where Variable_name = '%s'""".formatted(name); diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java index 0fb73078987d9..0705682671001 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java @@ -45,7 +45,7 @@ public void testEncrytion() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText() .contains(algorithm + " Encryption")); @@ -74,7 +74,7 @@ public void testCheckProtocol() throws SQLException { + algorithm + " )")); final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java index 851558d5cd491..e67716e455044 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java @@ -45,7 +45,7 @@ public void testEncrytion() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText() .contains(algorithm + " Encryption")); @@ -64,7 +64,7 @@ public void testNoneEncrytion() throws SQLException { final String network_service_banner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText() .contains("Encryption service")); @@ -93,7 +93,7 @@ public void testCheckProtocol() throws SQLException { + algorithm + " )")); final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.query(network_service_banner).collect(Collectors.toList()); + final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 32e979794a5f8..71826a0237e99 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -155,7 +155,7 @@ public List> getCheckOperations(final J if (isCdc(config)) { checkOperations.add(database -> { - final List matchingSlots = database.query(connection -> { + final List matchingSlots = database.unsafeQuery(connection -> { final String sql = "SELECT * FROM pg_replication_slots WHERE slot_name = ? AND plugin = ? AND database = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("replication_method").get("replication_slot").asText()); @@ -177,7 +177,7 @@ public List> getCheckOperations(final J }); checkOperations.add(database -> { - final List matchingPublications = database.query(connection -> { + final List matchingPublications = database.unsafeQuery(connection -> { final PreparedStatement ps = connection .prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); ps.setString(1, config.get("replication_method").get("publication").asText()); @@ -274,7 +274,7 @@ private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stre public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { - return database.query(connection -> { + return database.unsafeQuery(connection -> { final PreparedStatement ps = connection.prepareStatement( """ SELECT DISTINCT table_catalog, diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java index 9edab761d41b6..22c5d79f4de77 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractRelationalDbSource.java @@ -58,7 +58,7 @@ protected String getFullTableName(final String nameSpace, final String tableName protected AutoCloseableIterator queryTable(final Database database, final String sqlQuery) { return AutoCloseableIterators.lazyIterator(() -> { try { - final Stream stream = database.query(sqlQuery); + final Stream stream = database.unsafeQuery(sqlQuery); return AutoCloseableIterators.fromStream(stream); } catch (final Exception e) { throw new RuntimeException(e);