Skip to content

Commit a3ccd2a

Browse files
authored
Feature: Support adding service endpoint after construction (more) (#1276)
* Feature: Support adding service endpoint after construction (more) * fixed test
1 parent dabcba8 commit a3ccd2a

File tree

4 files changed

+118
-61
lines changed

4 files changed

+118
-61
lines changed

src/main/java/io/nats/service/InfoResponse.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
import io.nats.client.support.JsonUtils;
1717
import io.nats.client.support.JsonValue;
1818

19-
import java.util.*;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.Objects;
2023

2124
import static io.nats.client.support.ApiConstants.DESCRIPTION;
2225
import static io.nats.client.support.ApiConstants.ENDPOINTS;
@@ -33,13 +36,21 @@ public class InfoResponse extends ServiceResponse {
3336
private final String description;
3437
private final List<Endpoint> endpoints;
3538

36-
InfoResponse(String id, String name, String version, Map<String, String> metadata, String description, Collection<ServiceEndpoint> serviceEndpoints) {
39+
InfoResponse(String id, String name, String version, Map<String, String> metadata, String description) {
3740
super(TYPE, id, name, version, metadata);
3841
this.description = description;
3942
this.endpoints = new ArrayList<>();
40-
for (ServiceEndpoint se : serviceEndpoints) {
41-
addServiceEndpoint(se);
42-
}
43+
}
44+
45+
InfoResponse addServiceEndpoint(ServiceEndpoint se) {
46+
endpoints.add(new Endpoint(
47+
se.getName(),
48+
se.getSubject(),
49+
se.getQueueGroup(),
50+
se.getMetadata()
51+
));
52+
serialized.set(null);
53+
return this;
4354
}
4455

4556
InfoResponse(byte[] jsonBytes) {
@@ -75,19 +86,6 @@ public List<Endpoint> getEndpoints() {
7586
return endpoints;
7687
}
7788

78-
/**
79-
* Adds a service endpoint to the list of endpoints.
80-
* @param se the service endpoint to be added
81-
*/
82-
public void addServiceEndpoint(ServiceEndpoint se) {
83-
endpoints.add(new Endpoint(
84-
se.getName(),
85-
se.getSubject(),
86-
se.getQueueGroup(),
87-
se.getMetadata()
88-
));
89-
}
90-
9189
@Override
9290
public boolean equals(Object o) {
9391
if (this == o) return true;

src/main/java/io/nats/service/Service.java

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -63,32 +63,26 @@ public class Service {
6363
dInternals = new ArrayList<>();
6464
startStopLock = new ReentrantLock();
6565

66+
// build responses first. info needs to be available when adding service endpoints.
67+
pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
68+
infoResponse = new InfoResponse(id, b.name, b.version, b.metadata, b.description);
69+
6670
// set up the service contexts
67-
// ? do we need an internal dispatcher for any user endpoints
68-
Dispatcher dTemp = null;
71+
// ? do we need an internal dispatcher for any user endpoints !! addServiceEndpoint deals with it
6972
serviceContexts = new ConcurrentHashMap<>();
7073
for (ServiceEndpoint se : b.serviceEndpoints.values()) {
7174
addServiceEndpoint(se);
7275
}
73-
if (dTemp != null) {
74-
dInternals.add(dTemp);
75-
}
76-
77-
// build static responses
78-
pingResponse = new PingResponse(id, b.name, b.version, b.metadata);
79-
infoResponse = new InfoResponse(id, b.name, b.version, b.metadata, b.description, b.serviceEndpoints.values());
8076

77+
Dispatcher dTemp = null;
8178
if (b.pingDispatcher == null || b.infoDispatcher == null || b.statsDispatcher == null) {
8279
dTemp = conn.createDispatcher();
8380
dInternals.add(dTemp);
8481
}
85-
else {
86-
dTemp = null;
87-
}
8882

8983
discoveryContexts = new ArrayList<>();
9084
addDiscoveryContexts(SRV_PING, pingResponse, b.pingDispatcher, dTemp);
91-
addDynamicDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
85+
addDiscoveryContexts(SRV_INFO, infoResponse, b.infoDispatcher, dTemp);
9286
addStatsContexts(b.statsDispatcher, dTemp);
9387
}
9488

@@ -97,9 +91,12 @@ public class Service {
9791
* @param se the service endpoint to be added
9892
*/
9993
public void addServiceEndpoint(ServiceEndpoint se) {
100-
Dispatcher dTemp = null == dInternals || dInternals.isEmpty() ? null : dInternals.get(0);
101-
EndpointContext ctx = null;
94+
// do this first so it's available on start
95+
infoResponse.addServiceEndpoint(se);
96+
97+
EndpointContext ctx;
10298
if (se.getDispatcher() == null) {
99+
Dispatcher dTemp = dInternals.isEmpty() ? null : dInternals.get(0);
103100
if (dTemp == null) {
104101
dTemp = conn.createDispatcher();
105102
dInternals.add(dTemp);
@@ -109,19 +106,17 @@ public void addServiceEndpoint(ServiceEndpoint se) {
109106
ctx = new EndpointContext(conn, null, false, se);
110107
}
111108
serviceContexts.put(se.getName(), ctx);
109+
110+
// if the service is already started, start the newly added context
112111
startStopLock.lock();
113112
try {
114113
if (runningIndicator != null) {
115114
ctx.start();
116115
}
117-
} finally {
118-
startStopLock.unlock();
119116
}
120-
121-
if (null != infoResponse) {
122-
infoResponse.addServiceEndpoint(se);
117+
finally {
118+
startStopLock.unlock();
123119
}
124-
125120
}
126121

127122
private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispatcher dInternal, ServiceMessageHandler handler) {
@@ -138,19 +133,6 @@ private void addDiscoveryContexts(String discoveryName, Dispatcher dUser, Dispat
138133
}
139134
}
140135

141-
/**
142-
* Adds dynamic discovery contexts for the service, dynamically generating the bytes content per call.
143-
* This is different from `addDiscoveryContexts` which reuses the same static bytes at registration.
144-
* @param discoveryName the name of the discovery
145-
* @param dUser the user dispatcher
146-
* @param dInternal the internal dispatcher
147-
* @param handler the service message handler
148-
*/
149-
private void addDynamicDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
150-
ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
151-
addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
152-
}
153-
154136
/**
155137
* Adds discovery contexts for the service, reusing the same static bytes at registration.
156138
* @param discoveryName the name of the discovery
@@ -159,8 +141,7 @@ private void addDynamicDiscoveryContexts(String discoveryName, ServiceResponse s
159141
* @param dInternal the internal dispatcher
160142
*/
161143
private void addDiscoveryContexts(String discoveryName, ServiceResponse sr, Dispatcher dUser, Dispatcher dInternal) {
162-
final byte[] responseBytes = sr.serialize();
163-
ServiceMessageHandler handler = smsg -> smsg.respond(conn, responseBytes);
144+
ServiceMessageHandler handler = smsg -> smsg.respond(conn, sr.serialize());
164145
addDiscoveryContexts(discoveryName, dUser, dInternal, handler);
165146
}
166147

src/main/java/io/nats/service/ServiceResponse.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515

1616
import io.nats.client.support.*;
1717

18+
import java.nio.charset.StandardCharsets;
1819
import java.util.HashMap;
1920
import java.util.Map;
2021
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicReference;
2123

2224
import static io.nats.client.support.ApiConstants.*;
2325
import static io.nats.client.support.JsonUtils.endJson;
@@ -33,13 +35,15 @@ public abstract class ServiceResponse implements JsonSerializable {
3335
protected final String id;
3436
protected final String version;
3537
protected final Map<String, String> metadata;
38+
protected final AtomicReference<byte[]> serialized;
3639

3740
protected ServiceResponse(String type, String id, String name, String version, Map<String, String> metadata) {
3841
this.type = type;
3942
this.id = id;
4043
this.name = name;
4144
this.version = version;
4245
this.metadata = metadata == null || metadata.isEmpty() ? null : metadata;
46+
serialized = new AtomicReference<>();
4347
}
4448

4549
protected ServiceResponse(String type, ServiceResponse template) {
@@ -59,6 +63,16 @@ protected ServiceResponse(String type, JsonValue jv) {
5963
name = Validator.required(readString(jv, NAME), "Name");
6064
version = Validator.required(readString(jv, VERSION), "Version");
6165
metadata = readStringStringMap(jv, METADATA);
66+
serialized = new AtomicReference<>();
67+
}
68+
69+
@Override
70+
public byte[] serialize() {
71+
// lazy since endpoints can be added after creation
72+
if (serialized.get() == null) {
73+
serialized.set(toJson().getBytes(StandardCharsets.UTF_8));
74+
}
75+
return serialized.get();
6276
}
6377

6478
protected static JsonValue parseMessage(byte[] bytes) {
@@ -124,7 +138,6 @@ public String toJson() {
124138
return endJson(sb).toString();
125139
}
126140

127-
128141
@Override
129142
public String toString() {
130143
return JsonUtils.toKey(getClass()) + toJson();

src/test/java/io/nats/service/ServiceTests.java

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class ServiceTests extends JetStreamTestBase {
5656
public static final String SORT_ENDPOINT_DESCENDING_NAME = "SortEndpointDescending";
5757
public static final String SORT_ENDPOINT_ASCENDING_SUBJECT = "ascending";
5858
public static final String SORT_ENDPOINT_DESCENDING_SUBJECT = "descending";
59+
public static final String REVERSE_ENDPOINT_NAME = "ReverseEndpoint";
60+
public static final String REVERSE_ENDPOINT_SUBJECT = "reverse";
5961
public static final String CUSTOM_QGROUP = "customQ";
6062

6163
@Test
@@ -196,7 +198,7 @@ public void testServiceWorkflow() throws Exception {
196198
Discovery discovery = new Discovery(clientNc, 500, 5);
197199

198200
// ping discovery
199-
Verifier pingVerifier = (expected, response) -> assertTrue(response instanceof PingResponse);
201+
Verifier pingVerifier = (expected, response) -> assertInstanceOf(PingResponse.class, response);
200202
verifyDiscovery(discovery.ping(), pingVerifier, pingResponse1, pingResponse2);
201203
verifyDiscovery(discovery.ping(SERVICE_NAME_1), pingVerifier, pingResponse1);
202204
verifyDiscovery(discovery.ping(SERVICE_NAME_2), pingVerifier, pingResponse2);
@@ -206,7 +208,7 @@ public void testServiceWorkflow() throws Exception {
206208

207209
// info discovery
208210
Verifier infoVerifier = (expected, response) -> {
209-
assertTrue(response instanceof InfoResponse);
211+
assertInstanceOf(InfoResponse.class, response);
210212
InfoResponse exp = (InfoResponse) expected;
211213
InfoResponse r = (InfoResponse) response;
212214
assertEquals(exp.getDescription(), r.getDescription());
@@ -221,7 +223,7 @@ public void testServiceWorkflow() throws Exception {
221223

222224
// stats discovery
223225
Verifier statsVerifier = (expected, response) -> {
224-
assertTrue(response instanceof StatsResponse);
226+
assertInstanceOf(StatsResponse.class, response);
225227
StatsResponse exp = (StatsResponse) expected;
226228
StatsResponse sr = (StatsResponse) response;
227229
assertEquals(exp.getStarted(), sr.getStarted());
@@ -241,6 +243,44 @@ public void testServiceWorkflow() throws Exception {
241243
assertNull(discovery.stats(SERVICE_NAME_1, "badId"));
242244
assertNull(discovery.stats("bad", "badId"));
243245

246+
// ---------------------------------------------------------------------------
247+
// TEST ADDING AN ENDPOINT TO A RUNNING SERVICE
248+
// ---------------------------------------------------------------------------
249+
Endpoint endReverse = Endpoint.builder()
250+
.name(REVERSE_ENDPOINT_NAME)
251+
.subject(REVERSE_ENDPOINT_SUBJECT)
252+
.build();
253+
254+
ServiceEndpoint seRev1 = ServiceEndpoint.builder()
255+
.endpoint(endReverse)
256+
.handler(new ReverseHandler(serviceNc1))
257+
.build();
258+
259+
service1.addServiceEndpoint(seRev1);
260+
261+
for (int x = 0; x < requestCount; x++) {
262+
verifyServiceExecution(clientNc, REVERSE_ENDPOINT_NAME, REVERSE_ENDPOINT_SUBJECT, null);
263+
}
264+
infoResponse1 = service1.getInfoResponse();
265+
boolean found = false;
266+
for (Endpoint e : infoResponse1.getEndpoints()) {
267+
if (e.getName().equals(REVERSE_ENDPOINT_NAME)) {
268+
found = true;
269+
break;
270+
}
271+
}
272+
assertTrue(found);
273+
274+
statsResponse1 = service1.getStatsResponse();
275+
found = false;
276+
for (EndpointStats e : statsResponse1.getEndpointStatsList()) {
277+
if (e.getName().equals(REVERSE_ENDPOINT_NAME)) {
278+
found = true;
279+
break;
280+
}
281+
}
282+
assertTrue(found);
283+
244284
// test reset
245285
ZonedDateTime zdt = DateTimeUtils.gmtNow();
246286
sleep(1);
@@ -318,6 +358,9 @@ private static void verifyServiceExecution(Connection nc, String endpointName, S
318358
case SORT_ENDPOINT_DESCENDING_NAME:
319359
assertEquals(sortD(request), response);
320360
break;
361+
case REVERSE_ENDPOINT_NAME:
362+
assertEquals(reverse(request), response);
363+
break;
321364
}
322365
} catch (Exception e) {
323366
throw new RuntimeException(e);
@@ -363,6 +406,19 @@ public void onMessage(ServiceMessage smsg) {
363406
}
364407
}
365408

409+
static class ReverseHandler implements ServiceMessageHandler {
410+
Connection conn;
411+
412+
public ReverseHandler(Connection conn) {
413+
this.conn = conn;
414+
}
415+
416+
@Override
417+
public void onMessage(ServiceMessage smsg) {
418+
smsg.respond(conn, reverse(smsg.getData()));
419+
}
420+
}
421+
366422
private static String echo(String data) {
367423
return "Echo " + data;
368424
}
@@ -394,6 +450,14 @@ private static String sortD(String data) {
394450
return sortD(data.getBytes(StandardCharsets.UTF_8));
395451
}
396452

453+
private static String reverse(String data) {
454+
return new StringBuilder(data).reverse().toString();
455+
}
456+
457+
private static String reverse(byte[] data) {
458+
return reverse(new String(data));
459+
}
460+
397461
@Test
398462
public void testDispatchers() throws Exception {
399463
try (NatsTestServer ts = new NatsTestServer()) {
@@ -1107,7 +1171,7 @@ public void testServiceResponsesConstruction() {
11071171
endMeta.put("foo", "bar");
11081172
Endpoint ep = new Endpoint("endfoo", endMeta);
11091173
ServiceEndpoint se = new ServiceEndpoint(ep, m -> {}, null);
1110-
InfoResponse ir1 = new InfoResponse("id", "name", "0.0.0", metadata, "desc", Collections.singletonList(se));
1174+
InfoResponse ir1 = new InfoResponse("id", "name", "0.0.0", metadata, "desc").addServiceEndpoint(se);
11111175
InfoResponse ir2 = new InfoResponse(ir1.toJson().getBytes());
11121176
validateApiInOutInfoResponse(ir1);
11131177
validateApiInOutInfoResponse(ir2);
@@ -1129,10 +1193,11 @@ public void testServiceResponsesConstruction() {
11291193
validateApiInOutStatsResponse(stat1, serviceStarted, endStarteds, data);
11301194
validateApiInOutStatsResponse(stat2, serviceStarted, endStarteds, data);
11311195

1132-
EqualsVerifier.simple().forClass(PingResponse.class).verify();
1133-
EqualsVerifier.simple().forClass(InfoResponse.class).verify();
1196+
EqualsVerifier.simple().forClass(PingResponse.class).withIgnoredFields("serialized").verify();
1197+
EqualsVerifier.simple().forClass(InfoResponse.class).withIgnoredFields("serialized").verify();
11341198
EqualsVerifier.simple().forClass(StatsResponse.class)
11351199
.withPrefabValues(EndpointStats.class, statsList.get(0), statsList.get(1))
1200+
.withIgnoredFields("serialized")
11361201
.verify();
11371202
}
11381203

0 commit comments

Comments
 (0)