Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[incubator-kie-issues#1625] Connection leak in persistence jdbc common #3784

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
Expand Down Expand Up @@ -160,74 +160,33 @@ Optional<Record> findByBusinessKey(String processId, String processVersion, Stri
}
}

private static class CloseableWrapper implements Runnable {

private Deque<AutoCloseable> wrapped = new ArrayDeque<>();

public <T extends AutoCloseable> T nest(T c) {
wrapped.addFirst(c);
return c;
}

@Override
public void run() {
try {
close();
} catch (Exception ex) {
throw new RuntimeException("Error closing resources", ex);
}
}

public void close() throws Exception {
Exception exception = null;
for (AutoCloseable wrap : wrapped) {
try {
wrap.close();
} catch (Exception ex) {
if (exception != null) {
ex.addSuppressed(exception);
}
exception = ex;
}
}
if (exception != null) {
throw exception;
}
}
}

@Override
Stream<Record> findAllInternal(String processId, String processVersion) {
CloseableWrapper close = new CloseableWrapper();
try {
Connection connection = close.nest(dataSource.getConnection());
PreparedStatement statement = close.nest(connection.prepareStatement(sqlIncludingVersion(FIND_ALL, processVersion)));
try (Connection connection = dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(sqlIncludingVersion(FIND_ALL, processVersion))) {
statement.setString(1, processId);
if (processVersion != null) {
statement.setString(2, processVersion);
}
ResultSet resultSet = close.nest(statement.executeQuery());
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE, Spliterator.ORDERED) {
List<Record> record = new ArrayList<>();
Copy link
Contributor

@fjtirado fjtirado Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, the point of returning a stream was to not go through the whole list returned by the JDBC driver, avoiding a performance issue if the list of process instances is huge (notice that in the implementation being replaced you do not have to build the whole list of records )
Cant we fix the leak in closeablewrapper (whatever is it, maybe onclose is not invoked always?) without changing this part? I think we are going back to former implementaiton that provokes performance issues when the list of process instances grow.

try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
record.add(from(resultSet));
}
}
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(Long.MAX_VALUE, Spliterator.ORDERED) {
int idx = 0;

@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
boolean hasNext = resultSet.next();
if (hasNext) {
action.accept(from(resultSet));
}
return hasNext;
} catch (SQLException e) {
throw uncheckedException(e, "Error finding all process instances, for processId %s", processId);
if (idx < record.size()) {
action.accept(record.get(idx++));
return true;
}
return false;
}
}, false).onClose(close);
}, false);
} catch (SQLException e) {
try {
close.close();
} catch (Exception ex) {
e.addSuppressed(ex);
}
throw uncheckedException(e, "Error finding all process instances, for processId %s", processId);
}
}
Expand Down
Loading