Skip to content

Commit f60390e

Browse files
authored
Service, tuning review (#1279)
* Service, tuning review * Service, tuning review * Service, tuning review
1 parent 9b41f21 commit f60390e

File tree

4 files changed

+67
-54
lines changed

4 files changed

+67
-54
lines changed

src/main/java/io/nats/service/EndpointContext.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
import io.nats.client.Connection;
44
import io.nats.client.Dispatcher;
55
import io.nats.client.Message;
6-
import io.nats.client.Subscription;
76
import io.nats.client.support.DateTimeUtils;
87

8+
import java.time.Duration;
99
import java.time.ZonedDateTime;
10+
import java.util.concurrent.CompletableFuture;
1011
import java.util.concurrent.atomic.AtomicLong;
1112

1213
/**
@@ -19,12 +20,11 @@ class EndpointContext {
1920
private final ServiceMessageHandler handler;
2021
private final boolean recordStats;
2122
private final String qGroup;
23+
private boolean running;
2224

2325
private final boolean internalDispatcher;
2426
private final Dispatcher dispatcher;
2527

26-
private Subscription sub;
27-
2828
private ZonedDateTime started;
2929
private String lastError;
3030
private final AtomicLong numRequests;
@@ -37,6 +37,7 @@ class EndpointContext {
3737
handler = se.getHandler();
3838
this.recordStats = !internalEndpoint;
3939
qGroup = internalEndpoint ? null : se.getQueueGroup();
40+
running = false;
4041

4142
if (se.getDispatcher() == null) {
4243
dispatcher = internalDispatcher;
@@ -53,15 +54,19 @@ class EndpointContext {
5354
started = DateTimeUtils.gmtNow();
5455
}
5556

57+
// this method does not need a lock because it is only
58+
// called from Service start which is already locked
5659
void start() {
57-
if (sub == null) {
60+
if (!running) {
61+
// we do not need to track the sub, since drain occurs through the dispatcher
5862
if (qGroup == null) {
5963
dispatcher.subscribe(se.getSubject(), this::onMessage);
6064
}
6165
else {
6266
dispatcher.subscribe(se.getSubject(), qGroup, this::onMessage);
6367
}
6468
started = DateTimeUtils.gmtNow();
69+
running = true;
6570
}
6671
}
6772

@@ -115,7 +120,7 @@ boolean isNotInternalDispatcher() {
115120
return !internalDispatcher;
116121
}
117122

118-
Subscription getSub() {
119-
return sub;
123+
CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException {
124+
return dispatcher.drain(timeout);
120125
}
121126
}

src/main/java/io/nats/service/InfoResponse.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
import io.nats.client.support.JsonUtils;
1717
import io.nats.client.support.JsonValue;
1818

19-
import java.util.ArrayList;
20-
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Objects;
19+
import java.util.*;
2320

2421
import static io.nats.client.support.ApiConstants.DESCRIPTION;
2522
import static io.nats.client.support.ApiConstants.ENDPOINTS;
@@ -42,15 +39,25 @@ public class InfoResponse extends ServiceResponse {
4239
this.endpoints = new ArrayList<>();
4340
}
4441

45-
InfoResponse addServiceEndpoint(ServiceEndpoint se) {
42+
void addServiceEndpoint(ServiceEndpoint se) {
43+
_addServiceEndpoint(se);
44+
serialized.set(null);
45+
}
46+
47+
void addServiceEndpoints(Collection<ServiceEndpoint> serviceEndpoints) {
48+
for (ServiceEndpoint se : serviceEndpoints) {
49+
_addServiceEndpoint(se);
50+
}
51+
serialized.set(null);
52+
}
53+
54+
private void _addServiceEndpoint(ServiceEndpoint se) {
4655
endpoints.add(new Endpoint(
4756
se.getName(),
4857
se.getSubject(),
4958
se.getQueueGroup(),
5059
se.getMetadata()
5160
));
52-
serialized.set(null);
53-
return this;
5461
}
5562

5663
InfoResponse(byte[] jsonBytes) {

src/main/java/io/nats/service/Service.java

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,11 @@
2121
import java.time.Duration;
2222
import java.time.ZonedDateTime;
2323
import java.util.ArrayList;
24+
import java.util.Arrays;
25+
import java.util.Collection;
2426
import java.util.List;
2527
import java.util.concurrent.CompletableFuture;
2628
import java.util.concurrent.ConcurrentHashMap;
27-
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.locks.ReentrantLock;
3031

@@ -46,7 +47,7 @@ public class Service {
4647

4748
private final Connection conn;
4849
private final Duration drainTimeout;
49-
private final ConcurrentMap<String, EndpointContext> serviceContexts;
50+
private final ConcurrentHashMap<String, EndpointContext> serviceContexts;
5051
private final List<EndpointContext> discoveryContexts;
5152
private final List<Dispatcher> dInternals;
5253
private final PingResponse pingResponse;
@@ -70,9 +71,7 @@ public class Service {
7071
// set up the service contexts
7172
// ? do we need an internal dispatcher for any user endpoints !! addServiceEndpoint deals with it
7273
serviceContexts = new ConcurrentHashMap<>();
73-
for (ServiceEndpoint se : b.serviceEndpoints.values()) {
74-
addServiceEndpoint(se);
75-
}
74+
addServiceEndpoints(b.serviceEndpoints.values());
7675

7776
Dispatcher dTemp = null;
7877
if (b.pingDispatcher == null || b.infoDispatcher == null || b.statsDispatcher == null) {
@@ -87,31 +86,41 @@ public class Service {
8786
}
8887

8988
/**
90-
* Adds a service endpoint to the list of service contexts and starts it if the service is running.
91-
* @param se the service endpoint to be added
89+
* Adds one or more service endpoint to the list of service contexts and starts it if the service is running.
90+
* @param serviceEndpoints one or more service endpoints to be added
9291
*/
93-
public void addServiceEndpoint(ServiceEndpoint se) {
94-
// do this first so it's available on start
95-
infoResponse.addServiceEndpoint(se);
96-
97-
EndpointContext ctx;
98-
if (se.getDispatcher() == null) {
99-
Dispatcher dTemp = dInternals.isEmpty() ? null : dInternals.get(0);
100-
if (dTemp == null) {
101-
dTemp = conn.createDispatcher();
102-
dInternals.add(dTemp);
103-
}
104-
ctx = new EndpointContext(conn, dTemp, false, se);
105-
} else {
106-
ctx = new EndpointContext(conn, null, false, se);
107-
}
108-
serviceContexts.put(se.getName(), ctx);
92+
public void addServiceEndpoints(ServiceEndpoint... serviceEndpoints) {
93+
addServiceEndpoints(Arrays.asList(serviceEndpoints));
94+
}
10995

110-
// if the service is already started, start the newly added context
96+
/**
97+
* Adds all service endpoints to the list of service contexts and starts it if the service is running.
98+
* @param serviceEndpoints service endpoints to be added
99+
*/
100+
public void addServiceEndpoints(Collection<ServiceEndpoint> serviceEndpoints) {
111101
startStopLock.lock();
112102
try {
113-
if (runningIndicator != null) {
114-
ctx.start();
103+
// do this first so it's available on start
104+
infoResponse.addServiceEndpoints(serviceEndpoints);
105+
for (ServiceEndpoint se : serviceEndpoints) {
106+
EndpointContext ctx;
107+
if (se.getDispatcher() == null) {
108+
Dispatcher dTemp = dInternals.isEmpty() ? null : dInternals.get(0);
109+
if (dTemp == null) {
110+
dTemp = conn.createDispatcher();
111+
dInternals.add(dTemp);
112+
}
113+
ctx = new EndpointContext(conn, dTemp, false, se);
114+
}
115+
else {
116+
ctx = new EndpointContext(conn, null, false, se);
117+
}
118+
serviceContexts.put(se.getName(), ctx);
119+
120+
// if the service is already started, start the newly added context
121+
if (runningIndicator != null) {
122+
ctx.start();
123+
}
115124
}
116125
}
117126
finally {
@@ -242,7 +251,7 @@ public void stop(boolean drain, Throwable t) {
242251
for (EndpointContext c : serviceContexts.values()) {
243252
if (c.isNotInternalDispatcher()) {
244253
try {
245-
futures.add(c.getSub().drain(drainTimeout));
254+
futures.add(c.drain(drainTimeout));
246255
}
247256
catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
248257
}
@@ -251,7 +260,7 @@ public void stop(boolean drain, Throwable t) {
251260
for (EndpointContext c : discoveryContexts) {
252261
if (c.isNotInternalDispatcher()) {
253262
try {
254-
futures.add(c.getSub().drain(drainTimeout));
263+
futures.add(c.drain(drainTimeout));
255264
}
256265
catch (Exception e) { /* nothing I can really do, we are stopping anyway */ }
257266
}

src/test/java/io/nats/service/ServiceTests.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ public void testServiceWorkflow() throws Exception {
260260
.handler(new ReverseHandler(serviceNc1))
261261
.build();
262262

263-
service1.addServiceEndpoint(seRev1);
263+
service1.addServiceEndpoints(seRev1);
264264

265265
for (int x = 0; x < requestCount; x++) {
266266
verifyServiceExecution(clientNc, REVERSE_ENDPOINT_NAME, REVERSE_ENDPOINT_SUBJECT, null);
@@ -709,16 +709,7 @@ public void testDispatchers() throws Exception {
709709
done.get(100, TimeUnit.MILLISECONDS);
710710

711711
dispatchers = getDispatchers(nc);
712-
assertEquals(2, dispatchers.size()); // internal discovery was stopped
713-
assertTrue(dispatchers.containsValue(dStats));
714-
assertTrue(dispatchers.containsValue(dEnd));
715-
716-
nc.closeDispatcher(dStats);
717-
nc.closeDispatcher(dEnd);
718-
sleep(100); // no rush
719-
720-
dispatchers = getDispatchers(nc);
721-
assertEquals(0, dispatchers.size());
712+
assertEquals(0, dispatchers.size()); // stop() calls drain which closes dispatchers
722713

723714
se1 = ServiceEndpoint.builder()
724715
.endpointName("dispatch")
@@ -839,7 +830,7 @@ public void testAddingEndpointAfterServiceBuilderConstruction() {
839830
.drainTimeout(Duration.ofSeconds(1))
840831
.build();
841832

842-
service.addServiceEndpoint(se);
833+
service.addServiceEndpoints(se);
843834
assertEquals("desc", service.getDescription());
844835
assertEquals(Duration.ofSeconds(1), service.getDrainTimeout());
845836

@@ -1375,7 +1366,8 @@ public void testServiceResponsesConstruction() {
13751366
endMeta.put("foo", "bar");
13761367
Endpoint ep = new Endpoint("endfoo", endMeta);
13771368
ServiceEndpoint se = new ServiceEndpoint(ep, m -> {}, null);
1378-
InfoResponse ir1 = new InfoResponse("id", "name", "0.0.0", metadata, "desc").addServiceEndpoint(se);
1369+
InfoResponse ir1 = new InfoResponse("id", "name", "0.0.0", metadata, "desc");
1370+
ir1.addServiceEndpoint(se);
13791371
InfoResponse ir2 = new InfoResponse(ir1.toJson().getBytes());
13801372
validateApiInOutInfoResponse(ir1);
13811373
validateApiInOutInfoResponse(ir2);

0 commit comments

Comments
 (0)