Skip to content

Commit 606fa39

Browse files
authored
Fix (non-)ordering of worker threads in Vert.x (#96)
1 parent f500786 commit 606fa39

File tree

2 files changed

+114
-2
lines changed

2 files changed

+114
-2
lines changed

src/main/java/com/github/susom/database/DatabaseProviderVertx.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ public <T> void transactAsync(final DbCodeTyped<T> code, Handler<AsyncResult<T>>
214214
} catch (Throwable t) {
215215
promise.fail(t);
216216
}
217-
}, resultHandler);
217+
}, false, resultHandler);
218218
}
219219

220220
/**
@@ -289,7 +289,7 @@ public <T> void transactAsync(final DbCodeTypedTx<T> code, Handler<AsyncResult<T
289289
} catch (Throwable t) {
290290
promise.fail(t);
291291
}
292-
}, resultHandler);
292+
}, false, resultHandler);
293293
}
294294

295295
/**
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.github.susom.database.test;
2+
3+
import static java.lang.Thread.sleep;
4+
import static org.junit.Assert.assertEquals;
5+
6+
import com.github.susom.database.Config;
7+
import com.github.susom.database.ConfigFrom;
8+
import com.github.susom.database.DatabaseProviderVertx;
9+
import com.github.susom.database.DatabaseProviderVertx.Builder;
10+
import io.vertx.core.Vertx;
11+
import io.vertx.ext.unit.Async;
12+
import io.vertx.ext.unit.TestContext;
13+
import io.vertx.ext.unit.junit.VertxUnitRunner;
14+
import java.io.File;
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
import org.junit.Test;
18+
import org.junit.runner.RunWith;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
/**
23+
* Verify some asynchronous/blocking behavior of queries in the VertxProvider.
24+
*/
25+
@RunWith(VertxUnitRunner.class)
26+
public class VertxProviderTest {
27+
private final Logger log = LoggerFactory.getLogger(VertxProviderTest.class);
28+
29+
static {
30+
// We will put all Derby related files inside ./build to keep our working copy clean
31+
File directory = new File("target").getAbsoluteFile();
32+
if (directory.exists() || directory.mkdirs()) {
33+
System.setProperty("derby.stream.error.file", new File(directory, "derby.log").getAbsolutePath());
34+
}
35+
}
36+
37+
@Test
38+
public void testSlowOperationBlocking(TestContext context) {
39+
Async async = context.async();
40+
41+
Vertx vertx = Vertx.vertx();
42+
43+
// Set pool size to 1 so we can test blocking behavior: the
44+
// first request will block, and the rest will queue up
45+
Config config = ConfigFrom.firstOf()
46+
.value("database.url", "jdbc:derby:target/testdb;create=true")
47+
.value("database.pool.size", "1").get();
48+
Builder dbb = DatabaseProviderVertx.pooledBuilder(vertx, config);
49+
50+
List<Integer> results = runQueries(vertx, dbb, async);
51+
52+
async.awaitSuccess();
53+
assertEquals(List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19), results);
54+
vertx.close();
55+
dbb.close();
56+
}
57+
58+
@Test
59+
public void testSlowOperationNonBlocking(TestContext context) {
60+
Async async = context.async();
61+
62+
Vertx vertx = Vertx.vertx();
63+
64+
// Set pool size to 2 so we can test blocking behavior: the
65+
// first request will block, and the rest will execute in order
66+
// while it is blocked
67+
Config config = ConfigFrom.firstOf()
68+
.value("database.url", "jdbc:derby:target/testdb;create=true")
69+
.value("database.pool.size", "2").get();
70+
Builder dbb = DatabaseProviderVertx.pooledBuilder(vertx, config);
71+
72+
List<Integer> results = runQueries(vertx, dbb, async);
73+
74+
async.awaitSuccess();
75+
assertEquals(List.of(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 1), results);
76+
vertx.close();
77+
dbb.close();
78+
}
79+
80+
private List<Integer> runQueries(Vertx vertx, Builder dbb, Async async) {
81+
List<Integer> results = new ArrayList<>();
82+
vertx.runOnContext(v -> {
83+
log.info("Running on the event loop thread");
84+
dbb.transactAsync(db -> {
85+
log.info("Sleeping for 5s");
86+
sleep(5000);
87+
log.info("Done sleeping");
88+
return db.get().toSelect("values (1)").queryIntegerOrZero();
89+
}, ar -> {
90+
log.info("Completed query: " + ar.result());
91+
results.add(ar.result());
92+
if (results.size() == 19) {
93+
async.complete();
94+
}
95+
});
96+
97+
for (int i = 2; i < 20; i++) {
98+
int q = i;
99+
dbb.transactAsync(db ->
100+
db.get().toSelect("values (" + q + ")").queryIntegerOrZero()
101+
, ar -> {
102+
log.info("Completed query: " + ar.result());
103+
results.add(ar.result());
104+
if (results.size() == 19) {
105+
async.complete();
106+
}
107+
});
108+
}
109+
});
110+
return results;
111+
}
112+
}

0 commit comments

Comments
 (0)