Skip to content

Commit 475aa5d

Browse files
committed
code cleanup
1 parent 2cc42df commit 475aa5d

File tree

2 files changed

+139
-18
lines changed

2 files changed

+139
-18
lines changed

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ private void startTransaction(boolean preferThreading) throws SailException {
574574
if (!running.get()) {
575575
logger.warn(
576576
"LmdbSailStore was closed while active transaction was waiting for the next operation. Forcing a rollback!");
577-
opQueue.add(ROLLBACK_TRANSACTION);
577+
rollback();
578578
} else if (Thread.interrupted()) {
579579
throw new InterruptedException();
580580
} else {

testsuites/sail/src/main/java/org/eclipse/rdf4j/testsuite/sail/SailConcurrencyTest.java

Lines changed: 138 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
*******************************************************************************/
1111
package org.eclipse.rdf4j.testsuite.sail;
1212

13+
import static org.assertj.core.api.Assertions.assertThat;
1314
import static org.junit.jupiter.api.Assertions.assertEquals;
1415
import static org.junit.jupiter.api.Assertions.assertNotNull;
1516

@@ -323,15 +324,15 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException {
323324
}
324325
}
325326

326-
CountDownLatch countDownLatch = new CountDownLatch(1);
327-
Thread thread = new Thread(() -> {
327+
CountDownLatch countDownLatch1 = new CountDownLatch(1);
328+
Thread thread1 = new Thread(() -> {
328329
SailConnection connection = store.getConnection();
329-
countDownLatch.countDown();
330+
countDownLatch1.countDown();
330331
connection.begin(IsolationLevels.NONE);
331332
connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY);
332333
});
333-
thread.setName("Thread 1");
334-
thread.start();
334+
thread1.setName("Thread 1");
335+
thread1.start();
335336

336337
CountDownLatch countDownLatch2 = new CountDownLatch(1);
337338
Thread thread2 = new Thread(() -> {
@@ -344,15 +345,68 @@ public void testConcurrentConnectionsShutdown() throws InterruptedException {
344345
thread2.setName("Thread 2");
345346
thread2.start();
346347

347-
countDownLatch.await();
348+
countDownLatch1.await();
348349
countDownLatch2.await();
349350

350-
Thread.sleep(1000);
351+
while (thread1.isAlive() && thread2.isAlive()) {
352+
Thread.yield();
353+
}
351354

352355
store.shutDown();
353356

354357
}
355358

359+
@Test
360+
public void testConcurrentConnectionsShutdownReadCommitted() throws InterruptedException {
361+
if (store instanceof AbstractSail) {
362+
((AbstractSail) store).setConnectionTimeOut(200);
363+
} else if (store instanceof SailWrapper) {
364+
Sail baseSail = ((SailWrapper) store).getBaseSail();
365+
if (baseSail instanceof AbstractSail) {
366+
((AbstractSail) baseSail).setConnectionTimeOut(200);
367+
}
368+
}
369+
370+
CountDownLatch countDownLatch1 = new CountDownLatch(1);
371+
Thread thread1 = new Thread(() -> {
372+
SailConnection connection = store.getConnection();
373+
countDownLatch1.countDown();
374+
connection.begin(IsolationLevels.READ_COMMITTED);
375+
connection.addStatement(RDF.FIRST, RDF.TYPE, RDF.PROPERTY);
376+
});
377+
thread1.setName("Thread 1");
378+
thread1.start();
379+
380+
CountDownLatch countDownLatch2 = new CountDownLatch(1);
381+
Thread thread2 = new Thread(() -> {
382+
SailConnection connection = store.getConnection();
383+
countDownLatch2.countDown();
384+
connection.begin(IsolationLevels.READ_COMMITTED);
385+
connection.addStatement(RDF.REST, RDF.TYPE, RDF.PROPERTY);
386+
387+
});
388+
thread2.setName("Thread 2");
389+
thread2.start();
390+
391+
countDownLatch1.await();
392+
countDownLatch2.await();
393+
394+
while (thread1.isAlive() && thread2.isAlive()) {
395+
Thread.yield();
396+
}
397+
store.shutDown();
398+
399+
store.init();
400+
401+
try (SailConnection connection = store.getConnection()) {
402+
connection.begin();
403+
long size = connection.size();
404+
assertEquals(0, size);
405+
connection.commit();
406+
}
407+
408+
}
409+
356410
@Test
357411
public void testConcurrentConnectionsShutdownAndClose() throws InterruptedException {
358412
if (store instanceof AbstractSail) {
@@ -368,15 +422,15 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
368422
AtomicReference<SailConnection> connection1 = new AtomicReference<>();
369423
AtomicReference<SailConnection> connection2 = new AtomicReference<>();
370424

371-
CountDownLatch countDownLatch = new CountDownLatch(1);
372-
Thread thread = new Thread(() -> {
425+
CountDownLatch countDownLatch1 = new CountDownLatch(1);
426+
Thread thread1 = new Thread(() -> {
373427
connection1.set(store.getConnection());
374-
countDownLatch.countDown();
428+
countDownLatch1.countDown();
375429
connection1.get().begin(IsolationLevels.NONE);
376430
connection1.get().clear();
377431
});
378-
thread.setName("Thread 1");
379-
thread.start();
432+
thread1.setName("Thread 1");
433+
thread1.start();
380434

381435
CountDownLatch countDownLatch2 = new CountDownLatch(1);
382436
Thread thread2 = new Thread(() -> {
@@ -389,16 +443,76 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
389443
thread2.setName("Thread 2");
390444
thread2.start();
391445

392-
countDownLatch.await();
446+
countDownLatch1.await();
393447
countDownLatch2.await();
394448

395-
Thread.sleep(1000);
449+
while (thread1.isAlive() && thread2.isAlive()) {
450+
Thread.yield();
451+
}
396452

397-
Thread thread3 = new Thread(() -> {
453+
try {
454+
if (thread2.isAlive()) {
455+
connection2.get().close();
456+
connection1.get().close();
457+
} else {
458+
connection1.get().close();
459+
connection2.get().close();
460+
}
461+
} catch (SailException ignored) {
462+
}
463+
464+
try (SailConnection connection = store.getConnection()) {
465+
connection.begin();
466+
long size = connection.size();
467+
connection.commit();
468+
assertThat(size).isLessThanOrEqualTo(1);
469+
}
470+
471+
store.shutDown();
472+
}
473+
474+
@Test
475+
public void testConcurrentConnectionsShutdownAndCloseRollback() throws InterruptedException {
476+
if (store instanceof AbstractSail) {
477+
((AbstractSail) store).setConnectionTimeOut(200);
478+
}
479+
480+
try (SailConnection connection = store.getConnection()) {
481+
connection.begin();
482+
connection.addStatement(RDF.TYPE, RDF.TYPE, RDF.PROPERTY);
483+
connection.commit();
484+
}
485+
486+
AtomicReference<SailConnection> connection1 = new AtomicReference<>();
487+
AtomicReference<SailConnection> connection2 = new AtomicReference<>();
398488

489+
CountDownLatch countDownLatch1 = new CountDownLatch(1);
490+
Thread thread1 = new Thread(() -> {
491+
connection1.set(store.getConnection());
492+
countDownLatch1.countDown();
493+
connection1.get().begin(IsolationLevels.READ_UNCOMMITTED);
494+
connection1.get().clear();
399495
});
400-
thread3.setName("Thread 3");
401-
thread3.start();
496+
thread1.setName("Thread 1");
497+
thread1.start();
498+
499+
CountDownLatch countDownLatch2 = new CountDownLatch(1);
500+
Thread thread2 = new Thread(() -> {
501+
connection2.set(store.getConnection());
502+
countDownLatch2.countDown();
503+
connection2.get().begin(IsolationLevels.READ_UNCOMMITTED);
504+
connection2.get().clear();
505+
506+
});
507+
thread2.setName("Thread 2");
508+
thread2.start();
509+
510+
countDownLatch1.await();
511+
countDownLatch2.await();
512+
513+
while (thread1.isAlive() && thread2.isAlive()) {
514+
Thread.yield();
515+
}
402516

403517
try {
404518
if (thread2.isAlive()) {
@@ -411,6 +525,13 @@ public void testConcurrentConnectionsShutdownAndClose() throws InterruptedExcept
411525
} catch (SailException ignored) {
412526
}
413527

528+
try (SailConnection connection = store.getConnection()) {
529+
connection.begin();
530+
long size = connection.size();
531+
connection.commit();
532+
assertThat(size).isEqualTo(1);
533+
}
534+
414535
store.shutDown();
415536
}
416537

0 commit comments

Comments
 (0)