Skip to content

Commit 1ceea55

Browse files
authored
Don't flush after the request from publishAsync calls (#1220)
1 parent dabca74 commit 1ceea55

File tree

6 files changed

+65
-30
lines changed

6 files changed

+65
-30
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,19 @@ public class Options {
5656
// NOTE TO DEVS!!! To add an option, you have to address:
5757
// ----------------------------------------------------------------------------------------------------
5858
// CONSTANTS * optionally add a default value constant
59-
// ENVIRONMENT PROPERTIES * most of the time add an environment property, should always be in the form PFX +
59+
// ENVIRONMENT PROPERTIES * always add an environment property. Constant always starts with PFX, but code accepts without
6060
// PROTOCOL CONNECT OPTION CONSTANTS * not related to options, but here because Options code uses them
6161
// CLASS VARIABLES * add a variable to the class
6262
// BUILDER VARIABLES * add a variable in builder
63+
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
6364
// BUILD CONSTRUCTOR PROPS * update build props constructor to read new props
6465
// BUILDER METHODS * add a chainable method in builder for new variable
6566
// BUILD IMPL * update build() implementation if needed
66-
// BUILDER COPY CONSTRUCTOR * update builder constructor to ensure new variables are set
6767
// CONSTRUCTOR * update constructor to ensure new variables are set from builder
6868
// GETTERS * update getter to be able to retrieve class variable value
6969
// HELPER FUNCTIONS * just helpers
7070
// ----------------------------------------------------------------------------------------------------
71-
// README - if you add a property or change it's comment, add it to or update the readme
71+
// README - if you add a property or change its comment, add it to or update the readme
7272
// ----------------------------------------------------------------------------------------------------
7373

7474
// ----------------------------------------------------------------------------------------------------
@@ -492,6 +492,10 @@ public class Options {
492492
* {@link Builder#useDispatcherWithExecutor()}.
493493
*/
494494
public static final String PROP_USE_DISPATCHER_WITH_EXECUTOR = PFX + "use.dispatcher.with.executor";
495+
/**
496+
* Property used to configure a builder from a Properties object. {@value}, see {@link Builder#forceFlushOnRequest() forceFlushOnRequest}.
497+
*/
498+
public static final String PROP_FORCE_FLUSH_ON_REQUEST = PFX + "force.flush.on.request";
495499

496500
// ----------------------------------------------------------------------------------------------------
497501
// PROTOCOL CONNECT OPTION CONSTANTS
@@ -625,6 +629,7 @@ public class Options {
625629
private final boolean tlsFirst;
626630
private final boolean useTimeoutException;
627631
private final boolean useDispatcherWithExecutor;
632+
private final boolean forceFlushOnRequest;
628633

629634
private final AuthHandler authHandler;
630635
private final ReconnectDelayHandler reconnectDelayHandler;
@@ -741,6 +746,7 @@ public static class Builder {
741746
private boolean tlsFirst = false;
742747
private boolean useTimeoutException = false;
743748
private boolean useDispatcherWithExecutor = false;
749+
private boolean forceFlushOnRequest = true; // true since it's the original b/w compatible way
744750
private ServerPool serverPool = null;
745751
private DispatcherFactory dispatcherFactory = null;
746752

@@ -876,6 +882,7 @@ public Builder properties(Properties props) {
876882
booleanProperty(props, PROP_TLS_FIRST, b -> this.tlsFirst = b);
877883
booleanProperty(props, PROP_USE_TIMEOUT_EXCEPTION, b -> this.useTimeoutException = b);
878884
booleanProperty(props, PROP_USE_DISPATCHER_WITH_EXECUTOR, b -> this.useDispatcherWithExecutor = b);
885+
booleanProperty(props, PROP_FORCE_FLUSH_ON_REQUEST, b -> this.forceFlushOnRequest = b);
879886

880887
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
881888
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
@@ -1658,6 +1665,15 @@ public Builder useDispatcherWithExecutor() {
16581665
return this;
16591666
}
16601667

1668+
/**
1669+
* Instruct requests to turn off flush on requests.
1670+
* @return the Builder for chaining
1671+
*/
1672+
public Builder dontForceFlushOnRequest() {
1673+
this.forceFlushOnRequest = false;
1674+
return this;
1675+
}
1676+
16611677
/**
16621678
* Set the ServerPool implementation for connections to use instead of the default implementation
16631679
* @param serverPool the implementation
@@ -1905,6 +1921,7 @@ public Builder(Options o) {
19051921
this.tlsFirst = o.tlsFirst;
19061922
this.useTimeoutException = o.useTimeoutException;
19071923
this.useDispatcherWithExecutor = o.useDispatcherWithExecutor;
1924+
this.forceFlushOnRequest = o.forceFlushOnRequest;
19081925

19091926
this.serverPool = o.serverPool;
19101927
this.dispatcherFactory = o.dispatcherFactory;
@@ -1969,6 +1986,7 @@ private Options(Builder b) {
19691986
this.tlsFirst = b.tlsFirst;
19701987
this.useTimeoutException = b.useTimeoutException;
19711988
this.useDispatcherWithExecutor = b.useDispatcherWithExecutor;
1989+
this.forceFlushOnRequest = b.forceFlushOnRequest;
19721990

19731991
this.serverPool = b.serverPool;
19741992
this.dispatcherFactory = b.dispatcherFactory;
@@ -2405,8 +2423,20 @@ public boolean useTimeoutException() {
24052423
return useTimeoutException;
24062424
}
24072425

2426+
/**
2427+
* Whether the dispatcher should use an executor to async messages to handlers
2428+
* @return the flag
2429+
*/
24082430
public boolean useDispatcherWithExecutor() { return useDispatcherWithExecutor; }
24092431

2432+
/**
2433+
* Whether to flush on any user request
2434+
* @return the flag
2435+
*/
2436+
public boolean forceFlushOnRequest() {
2437+
return forceFlushOnRequest;
2438+
}
2439+
24102440
/**
24112441
* Get the ServerPool implementation. If null, a default implementation is used.
24122442
* @return the ServerPool implementation

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class NatsConnection implements Connection {
4747
public static final double NANOS_PER_SECOND = 1_000_000_000.0;
4848

4949
private final Options options;
50+
final boolean forceFlushOnRequest;
5051

5152
private final StatisticsCollector statistics;
5253

@@ -112,6 +113,7 @@ class NatsConnection implements Connection {
112113
timeTraceLogger.trace("creating connection object");
113114

114115
this.options = options;
116+
forceFlushOnRequest = options.forceFlushOnRequest();
115117

116118
advancedTracking = options.isTrackAdvancedStats();
117119
this.statistics = options.getStatisticsCollector() == null ? new NatsStatistics() : options.getStatisticsCollector();
@@ -1205,15 +1207,15 @@ else if (future.isDone()) {
12051207
*/
12061208
@Override
12071209
public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
1208-
return requestInternal(subject, null, body, timeout, cancelAction, true);
1210+
return requestInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
12091211
}
12101212

12111213
/**
12121214
* {@inheritDoc}
12131215
*/
12141216
@Override
12151217
public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
1216-
return requestInternal(subject, headers, body, timeout, cancelAction, true);
1218+
return requestInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
12171219
}
12181220

12191221
/**
@@ -1222,11 +1224,12 @@ public Message request(String subject, Headers headers, byte[] body, Duration ti
12221224
@Override
12231225
public Message request(Message message, Duration timeout) throws InterruptedException {
12241226
validateNotNull(message, "Message");
1225-
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
1227+
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
12261228
}
12271229

1228-
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws InterruptedException {
1229-
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo);
1230+
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout,
1231+
CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws InterruptedException {
1232+
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
12301233
try {
12311234
return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
12321235
} catch (TimeoutException | ExecutionException | CancellationException e) {
@@ -1239,31 +1242,31 @@ Message requestInternal(String subject, Headers headers, byte[] data, Duration t
12391242
*/
12401243
@Override
12411244
public CompletableFuture<Message> request(String subject, byte[] body) {
1242-
return requestFutureInternal(subject, null, body, null, cancelAction, true);
1245+
return requestFutureInternal(subject, null, body, null, cancelAction, true, forceFlushOnRequest);
12431246
}
12441247

12451248
/**
12461249
* {@inheritDoc}
12471250
*/
12481251
@Override
12491252
public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
1250-
return requestFutureInternal(subject, headers, body, null, cancelAction, true);
1253+
return requestFutureInternal(subject, headers, body, null, cancelAction, true, forceFlushOnRequest);
12511254
}
12521255

12531256
/**
12541257
* {@inheritDoc}
12551258
*/
12561259
@Override
12571260
public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
1258-
return requestFutureInternal(subject, null, body, timeout, cancelAction, true);
1261+
return requestFutureInternal(subject, null, body, timeout, cancelAction, true, forceFlushOnRequest);
12591262
}
12601263

12611264
/**
12621265
* {@inheritDoc}
12631266
*/
12641267
@Override
12651268
public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
1266-
return requestFutureInternal(subject, headers, body, timeout, cancelAction, true);
1269+
return requestFutureInternal(subject, headers, body, timeout, cancelAction, true, forceFlushOnRequest);
12671270
}
12681271

12691272
/**
@@ -1272,7 +1275,7 @@ public CompletableFuture<Message> requestWithTimeout(String subject, Headers hea
12721275
@Override
12731276
public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
12741277
validateNotNull(message, "Message");
1275-
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
1278+
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false, forceFlushOnRequest);
12761279
}
12771280

12781281
/**
@@ -1281,10 +1284,11 @@ public CompletableFuture<Message> requestWithTimeout(Message message, Duration t
12811284
@Override
12821285
public CompletableFuture<Message> request(Message message) {
12831286
validateNotNull(message, "Message");
1284-
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false);
1287+
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false, forceFlushOnRequest);
12851288
}
12861289

1287-
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) {
1290+
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout,
1291+
CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) {
12881292
checkPayloadSize(data);
12891293

12901294
if (isClosed()) {
@@ -1336,7 +1340,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
13361340
responsesAwaiting.put(sub.getSID(), future);
13371341
}
13381342

1339-
publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, true);
1343+
publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo, flushImmediatelyAfterPublish);
13401344
statistics.incrementRequestsSent();
13411345

13421346
return future;

src/main/java/io/nats/client/impl/NatsJetStream.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.nats.client.support.Validator;
1919

2020
import java.io.IOException;
21-
import java.time.Duration;
2221
import java.util.ArrayList;
2322
import java.util.List;
2423
import java.util.concurrent.CompletableFuture;
@@ -97,31 +96,31 @@ public PublishAck publish(Message message, PublishOptions options) throws IOExce
9796
*/
9897
@Override
9998
public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body) {
100-
return publishAsyncInternal(subject, null, body, null, null, true);
99+
return publishAsyncInternal(subject, null, body, null, true);
101100
}
102101

103102
/**
104103
* {@inheritDoc}
105104
*/
106105
@Override
107106
public CompletableFuture<PublishAck> publishAsync(String subject, Headers headers, byte[] body) {
108-
return publishAsyncInternal(subject, headers, body, null, null, true);
107+
return publishAsyncInternal(subject, headers, body, null, true);
109108
}
110109

111110
/**
112111
* {@inheritDoc}
113112
*/
114113
@Override
115114
public CompletableFuture<PublishAck> publishAsync(String subject, byte[] body, PublishOptions options) {
116-
return publishAsyncInternal(subject, null, body, options, null, true);
115+
return publishAsyncInternal(subject, null, body, options, true);
117116
}
118117

119118
/**
120119
* {@inheritDoc}
121120
*/
122121
@Override
123122
public CompletableFuture<PublishAck> publishAsync(String subject, Headers headers, byte[] body, PublishOptions options) {
124-
return publishAsyncInternal(subject, headers, body, options, null, true);
123+
return publishAsyncInternal(subject, headers, body, options, true);
125124
}
126125

127126
/**
@@ -130,7 +129,7 @@ public CompletableFuture<PublishAck> publishAsync(String subject, Headers header
130129
@Override
131130
public CompletableFuture<PublishAck> publishAsync(Message message) {
132131
validateNotNull(message, "Message");
133-
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, null, false);
132+
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), null, false);
134133
}
135134

136135
/**
@@ -139,7 +138,7 @@ public CompletableFuture<PublishAck> publishAsync(Message message) {
139138
@Override
140139
public CompletableFuture<PublishAck> publishAsync(Message message, PublishOptions options) {
141140
validateNotNull(message, "Message");
142-
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, null, false);
141+
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, false);
143142
}
144143

145144
private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) throws IOException, JetStreamApiException {
@@ -150,19 +149,19 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d
150149
return null;
151150
}
152151

153-
Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo);
152+
Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo, conn.forceFlushOnRequest);
154153
return processPublishResponse(resp, options);
155154
}
156155

157-
private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout, boolean validateSubjectAndReplyTo) {
156+
private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) {
158157
Headers merged = mergePublishOptions(headers, options);
159158

160159
if (jso.isPublishNoAck()) {
161160
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo, false);
162161
return null;
163162
}
164163

165-
CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
164+
CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, null, CancelAction.COMPLETE, validateSubjectAndReplyTo, conn.forceFlushOnRequest);
166165

167166
return future.thenCompose(resp -> {
168167
try {

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public CachedStreamInfo(StreamInfo si) {
5353
// ----------------------------------------------------------------------------------------------------
5454
// Create / Init
5555
// ----------------------------------------------------------------------------------------------------
56-
NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) throws IOException {
56+
NatsJetStreamImpl(NatsConnection connection, JetStreamOptions jsOptions) {
5757
conn = connection;
5858

5959
// Get a working version of JetStream Options...
@@ -246,9 +246,9 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo
246246
}
247247
}
248248

249-
Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws IOException {
249+
Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo, boolean flushImmediatelyAfterPublish) throws IOException {
250250
try {
251-
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo));
251+
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo, flushImmediatelyAfterPublish));
252252
} catch (InterruptedException e) {
253253
Thread.currentThread().interrupt();
254254
throw new IOException(e);

src/main/java/io/nats/client/impl/NatsJetStreamPullSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ protected String _pull(PullRequestOptions pullRequestOptions, boolean raiseStatu
6262
String publishSubject = js.prependPrefix(String.format(JSAPI_CONSUMER_MSG_NEXT, stream, consumerName));
6363
String pullSubject = getSubject().replace("*", Long.toString(this.pullSubjectIdHolder.incrementAndGet()));
6464
manager.startPullRequest(pullSubject, pullRequestOptions, raiseStatusWarnings, pullManagerObserver);
65-
connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, true);
65+
connection.publishInternal(publishSubject, pullSubject, null, pullRequestOptions.serialize(), true, connection.forceFlushOnRequest);
6666
return pullSubject;
6767
}
6868

src/test/java/io/nats/client/OptionsTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ public void testPropertiesCoverageOptions() throws Exception {
540540
props.setProperty(Options.PROP_CLIENT_SIDE_LIMIT_CHECKS, "true"); // deprecated
541541
props.setProperty(Options.PROP_IGNORE_DISCOVERED_SERVERS, "true");
542542
props.setProperty(Options.PROP_NO_RESOLVE_HOSTNAMES, "true");
543+
props.setProperty(PROP_FORCE_FLUSH_ON_REQUEST, "false");
543544

544545
Options o = new Options.Builder(props).build();
545546
_testPropertiesCoverageOptions(o);
@@ -553,6 +554,7 @@ private static void _testPropertiesCoverageOptions(Options o) {
553554
assertTrue(o.clientSideLimitChecks());
554555
assertTrue(o.isIgnoreDiscoveredServers());
555556
assertTrue(o.isNoResolveHostnames());
557+
assertFalse(o.forceFlushOnRequest());
556558
}
557559

558560
@Test

0 commit comments

Comments
 (0)