Skip to content

Commit d9eec8b

Browse files
authored
Ensure NatsConnection inboxDispatcher is started prior to publishing messages (#1109)
* Ensure NatsConnection inboxDispatcher is started prior to publishing messages Fixes #1065 If multiple treads call publish messages at the same time, some messages may not be published because the inboxDispater is not started yet. Change updates NatsConnection to ensure the inboxDispatcher is started prior to publishing messages. * Fix nats_server_path environment variable in README.md
1 parent ea126d6 commit d9eec8b

File tree

3 files changed

+50
-15
lines changed

3 files changed

+50
-15
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,7 @@ The java doc is located in `build/docs` and the example jar is in `build/libs`.
12211221
12221222
which will create a folder called `build/reports/jacoco` containing the file `index.html` you can open and use to browse the coverage. Keep in mind we have focused on library test coverage, not coverage for the examples.
12231223
1224-
Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_-_server_path`.
1224+
Many of the tests run nats-server on a custom port. If nats-server is in your path they should just work, but in cases where it is not, or an IDE running tests has issues with the path you can specify the nats-server location with the environment variable `nats_server_path`.
12251225
12261226
## TLS Certs
12271227

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ class NatsConnection implements Connection {
8181

8282
private final String mainInbox;
8383
private final AtomicReference<NatsDispatcher> inboxDispatcher;
84+
private final ReentrantLock inboxDispatcherLock;
8485
private Timer timer;
8586

8687
private final AtomicBoolean needPing;
@@ -147,6 +148,7 @@ class NatsConnection implements Connection {
147148

148149
this.serverInfo = new AtomicReference<>();
149150
this.inboxDispatcher = new AtomicReference<>();
151+
this.inboxDispatcherLock = new ReentrantLock();
150152
this.pongQueue = new ConcurrentLinkedDeque<>();
151153
this.draining = new AtomicReference<>();
152154
this.blockPublishForDrain = new AtomicBoolean();
@@ -1186,17 +1188,20 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
11861188
}
11871189

11881190
if (inboxDispatcher.get() == null) {
1189-
NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1190-
1191-
// Theoretically two threads could be here
1192-
// compareAndSet returns false if thread 2 set the dispatcher
1193-
// in between the time thread 1 did get above and tried to compareAndSet
1194-
// really thin edge condition - could have used a lock, but this is probably enough
1195-
if (inboxDispatcher.compareAndSet(null, d)) {
1196-
String id = this.nuid.next();
1197-
this.dispatchers.put(id, d);
1198-
d.start(id);
1199-
d.subscribe(this.mainInbox);
1191+
inboxDispatcherLock.lock();
1192+
try {
1193+
if (inboxDispatcher.get() == null) {
1194+
NatsDispatcher d = dispatcherFactory.createDispatcher(this, this::deliverReply);
1195+
1196+
// Ensure the dispatcher is started before publishing messages
1197+
String id = this.nuid.next();
1198+
this.dispatchers.put(id, d);
1199+
d.start(id);
1200+
d.subscribe(this.mainInbox);
1201+
inboxDispatcher.set(d);
1202+
}
1203+
} finally {
1204+
inboxDispatcherLock.unlock();
12001205
}
12011206
}
12021207

src/test/java/io/nats/client/impl/JetStreamPubTests.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@
2323
import java.time.Duration;
2424
import java.util.ArrayList;
2525
import java.util.List;
26-
import java.util.concurrent.CompletableFuture;
27-
import java.util.concurrent.ExecutionException;
28-
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.*;
2927

3028
import static org.junit.jupiter.api.Assertions.*;
3129

@@ -184,6 +182,38 @@ public void testPublishAsyncVarieties() throws Exception {
184182
});
185183
}
186184

185+
@Test
186+
public void testMultithreadedPublishAsync() throws Exception {
187+
final ExecutorService executorService = Executors.newFixedThreadPool(3);
188+
try {
189+
jsServer.run(nc -> {
190+
TestingStreamContainer tsc = new TestingStreamContainer(nc);
191+
final int messagesToPublish = 6;
192+
// create a new connection that does not have the inbox dispatcher set
193+
try (NatsConnection nc2 = new NatsConnection(nc.getOptions())){
194+
nc2.connect(true);
195+
JetStream js = nc2.jetStream();
196+
197+
List<Future<CompletableFuture<PublishAck>>> futures = new ArrayList<>();
198+
for (int i = 0; i < messagesToPublish; i++) {
199+
final Future<CompletableFuture<PublishAck>> submitFuture = executorService.submit(() ->
200+
js.publishAsync(tsc.subject(), dataBytes(1)));
201+
futures.add(submitFuture);
202+
}
203+
// verify all messages were published
204+
for (int i = 0; i < messagesToPublish; i++) {
205+
CompletableFuture<PublishAck> future = futures.get(i).get(200, TimeUnit.MILLISECONDS);
206+
PublishAck pa = future.get(200, TimeUnit.MILLISECONDS);
207+
assertEquals(tsc.stream, pa.getStream());
208+
assertFalse(pa.isDuplicate());
209+
}
210+
}
211+
});
212+
} finally {
213+
executorService.shutdownNow();
214+
}
215+
}
216+
187217
private void assertFutureIOException(CompletableFuture<PublishAck> future) {
188218
ExecutionException ee = assertThrows(ExecutionException.class, future::get);
189219
assertTrue(ee.getCause() instanceof RuntimeException);

0 commit comments

Comments
 (0)