Skip to content

Commit 126ce36

Browse files
🐛 Fixed connection leak in StreamingJdbcDatabase (#20888)
* Fixed connection leak in StreamingJdbcDatabase * fixed checkstyle
1 parent 0548d5f commit 126ce36

File tree

2 files changed

+13
-4
lines changed

2 files changed

+13
-4
lines changed

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
public abstract class JdbcDatabase extends SqlDatabase {
3030

3131
protected final JdbcCompatibleSourceOperations<?> sourceOperations;
32+
protected Exception streamException;
33+
protected boolean isStreamFailed;
3234

3335
public JdbcDatabase(final JdbcCompatibleSourceOperations<?> sourceOperations) {
3436
this.sourceOperations = sourceOperations;

airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/StreamingJdbcDatabase.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
7171
try {
7272
connection.setAutoCommit(true);
7373
connection.close();
74+
if (isStreamFailed) {
75+
throw new RuntimeException(streamException);
76+
}
7477
} catch (final SQLException e) {
7578
throw new RuntimeException(e);
7679
}
@@ -84,9 +87,9 @@ public <T> Stream<T> unsafeQuery(final CheckedFunction<Connection, PreparedState
8487
* This method differs from {@link DefaultJdbcDatabase#toUnsafeStream} in that it takes a streaming
8588
* config that adjusts the fetch size dynamically according to sampled row size.
8689
*/
87-
protected static <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
88-
final CheckedFunction<ResultSet, T, SQLException> mapper,
89-
final JdbcStreamingQueryConfig streamingConfig) {
90+
protected <T> Stream<T> toUnsafeStream(final ResultSet resultSet,
91+
final CheckedFunction<ResultSet, T, SQLException> mapper,
92+
final JdbcStreamingQueryConfig streamingConfig) {
9093
return StreamSupport.stream(new Spliterators.AbstractSpliterator<>(Long.MAX_VALUE, Spliterator.ORDERED) {
9194

9295
@Override
@@ -102,7 +105,11 @@ public boolean tryAdvance(final Consumer<? super T> action) {
102105
return true;
103106
} catch (final SQLException e) {
104107
LOGGER.error("SQLState: {}, Message: {}", e.getSQLState(), e.getMessage());
105-
throw new RuntimeException(e);
108+
streamException = e;
109+
isStreamFailed = true;
110+
// throwing an exception in tryAdvance() method lead to the endless loop in Spliterator and stream
111+
// will never close
112+
return false;
106113
}
107114
}
108115

0 commit comments

Comments
 (0)