Skip to content

Commit 2cc42df

Browse files
committed
code cleanup
1 parent 246072e commit 2cc42df

File tree

3 files changed

+54
-20
lines changed

3 files changed

+54
-20
lines changed

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSail.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,13 +229,15 @@ public void shutDown() throws SailException {
229229

230230
if (con instanceof AbstractSailConnection) {
231231
AbstractSailConnection sailCon = (AbstractSailConnection) con;
232-
if (sailCon.getOwner() != Thread.currentThread()) {
233-
sailCon.getOwner().interrupt();
234-
sailCon.getOwner().join(1000);
235-
if (sailCon.getOwner().isAlive()) {
232+
Thread owner = sailCon.getOwner();
233+
if (owner != Thread.currentThread()) {
234+
owner.interrupt();
235+
// wait up to 1 second for the owner thread to die
236+
owner.join(1000);
237+
if (owner.isAlive()) {
236238
logger.error(
237239
"Closing active connection due to shut down and interrupted the owning thread of the connection {} but thread is still alive after 1000 ms!",
238-
sailCon.getOwner());
240+
owner);
239241
}
240242
}
241243
}

core/sail/api/src/main/java/org/eclipse/rdf4j/sail/helpers/AbstractSailConnection.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,12 @@ public final void close() throws SailException {
265265
if (sumDone == sumBlocking) {
266266
break;
267267
} else {
268-
LockSupport.parkNanos(Duration.ofMillis(10).toNanos());
268+
if (Thread.currentThread().isInterrupted()) {
269+
throw new SailException(
270+
"Connection was interrupted while waiting on active operations before it could be closed.");
271+
} else {
272+
LockSupport.parkNanos(Duration.ofMillis(10).toNanos());
273+
}
269274
}
270275
}
271276

@@ -317,26 +322,35 @@ private Thread startDeadlockPreventionThread() {
317322
Thread deadlockPreventionThread = null;
318323

319324
if (Thread.currentThread() != owner) {
325+
320326
if (logger.isInfoEnabled()) {
327+
// use info level for this because FedX prevalently closes connections from different threads
321328
logger.info(
322-
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by "
323-
+ owner + " closed by " + Thread.currentThread(),
324-
new Throwable("Throwable used for stacktrace"));
329+
"Closing connection from a different thread than the one that opened it. Connections should not be shared between threads. Opened by {} closed by {}",
330+
owner, Thread.currentThread(), new Throwable("Throwable used for stacktrace"));
325331
}
332+
326333
deadlockPreventionThread = new Thread(() -> {
327334
try {
335+
// This thread should sleep for a while so that the callee has a chance to finish.
336+
// The callee will interrupt this thread when it is finished, which means that there were no
337+
// deadlocks and we can exit.
328338
Thread.sleep(sailBase.connectionTimeOut / 2);
329339

330340
owner.interrupt();
341+
// wait for up to 1 second for the owner thread to die
331342
owner.join(1000);
332343
if (owner.isAlive()) {
333344
logger.error("Interrupted thread {} but thread is still alive after 1000 ms!", owner);
334345
}
335346

336347
} catch (InterruptedException ignored) {
348+
// this thread is interrupted as a signal that there were no deadlocks, so the exception can be
349+
// ignored and we can simply exit
337350
}
338351

339352
});
353+
340354
deadlockPreventionThread.setDaemon(true);
341355
deadlockPreventionThread.start();
342356

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,15 @@
1212

1313
import java.io.File;
1414
import java.io.IOException;
15-
import java.time.Duration;
1615
import java.util.ArrayList;
1716
import java.util.List;
1817
import java.util.Map;
1918
import java.util.Map.Entry;
2019
import java.util.Objects;
2120
import java.util.concurrent.ExecutorService;
2221
import java.util.concurrent.Executors;
22+
import java.util.concurrent.TimeUnit;
2323
import java.util.concurrent.atomic.AtomicBoolean;
24-
import java.util.concurrent.locks.LockSupport;
2524
import java.util.concurrent.locks.ReentrantLock;
2625

2726
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
@@ -235,13 +234,21 @@ public void close() throws SailException {
235234
}
236235
} finally {
237236
if (tripleStore != null) {
238-
running.set(false);
239-
tripleStoreExecutor.shutdown();
240-
while (!tripleStoreExecutor.isTerminated()) {
241-
tripleStoreExecutor.shutdownNow();
242-
LockSupport.parkNanos(Duration.ofMillis(100).toNanos());
237+
try {
238+
running.set(false);
239+
tripleStoreExecutor.shutdown();
240+
try {
241+
while (!tripleStoreExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
242+
logger.warn("Waiting for triple store executor to terminate");
243+
}
244+
} catch (InterruptedException e) {
245+
Thread.currentThread().interrupt();
246+
throw new SailException(e);
247+
}
248+
} finally {
249+
tripleStore.close();
243250
}
244-
tripleStore.close();
251+
245252
}
246253
}
247254
}
@@ -564,17 +571,28 @@ private void startTransaction(boolean preferThreading) throws SailException {
564571
op.execute();
565572
}
566573
} else {
567-
if (Thread.interrupted()) {
574+
if (!running.get()) {
575+
logger.warn(
576+
"LmdbSailStore was closed while active transaction was waiting for the next operation. Forcing a rollback!");
577+
opQueue.add(ROLLBACK_TRANSACTION);
578+
} else if (Thread.interrupted()) {
568579
throw new InterruptedException();
580+
} else {
581+
Thread.yield();
569582
}
570-
Thread.yield();
571583
}
572584
}
573585

574586
// keep thread running for at least 2ms to lock-free wait for the next
575587
// transaction
576-
long start = System.currentTimeMillis();
588+
long start = 0;
577589
while (running.get() && !nextTransactionAsync) {
590+
if (start == 0) {
591+
// System.currentTimeMillis() is expensive, so only call it when we
592+
// are sure we need to wait
593+
start = System.currentTimeMillis();
594+
}
595+
578596
if (System.currentTimeMillis() - start > 2) {
579597
synchronized (storeTxnStarted) {
580598
if (!nextTransactionAsync) {

0 commit comments

Comments
 (0)