Skip to content

Commit 186f0a8

Browse files
committed
Support 409 Leadership Change
1 parent 8da8e3a commit 186f0a8

File tree

4 files changed

+102
-18
lines changed

4 files changed

+102
-18
lines changed

src/main/java/io/nats/client/impl/PullMessageManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,23 +161,26 @@ protected ManageResult manageStatus(Message msg) {
161161
case CONFLICT_CODE:
162162
// sometimes just a warning
163163
String statMsg = status.getMessage();
164-
if (statMsg.startsWith("Exceeded Max")) {
164+
if (statMsg.startsWith("Exceeded Max")
165+
|| statMsg.equals(SERVER_SHUTDOWN)
166+
|| statMsg.equals(LEADERSHIP_CHANGE)
167+
) {
165168
if (raiseStatusWarnings) {
166169
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
167170
}
168171
return STATUS_HANDLED;
169172
}
170173

171174
if (statMsg.equals(BATCH_COMPLETED) ||
172-
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES) ||
173-
statMsg.equals(SERVER_SHUTDOWN))
175+
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
174176
{
175177
return STATUS_TERMINUS;
176178
}
177179
break;
178180
}
179181

180-
// all others are errors
182+
// All unknown 409s are errors, since that basically means the client is not aware of them.
183+
// These known ones are also errors: "Consumer Deleted" and "Consumer is push based"
181184
conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status));
182185
return STATUS_ERROR;
183186
}

src/main/java/io/nats/client/support/Status.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class Status {
4141

4242
public static String BATCH_COMPLETED = "Batch Completed"; // 409 informational
4343
public static String SERVER_SHUTDOWN = "Server Shutdown"; // 409 informational with headers
44+
public static String LEADERSHIP_CHANGE = "Leadership Change"; // 409
4445

4546
private final int code;
4647
private final String message;

src/test/java/io/nats/client/impl/JetStreamPullTests.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -947,20 +947,21 @@ public void testConsumerDeletedSyncSub() throws Exception {
947947
});
948948
}
949949

950-
// This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is.
951-
// @Test
952-
// public void testConsumerDeletedAsyncSub() throws Exception {
953-
// testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> {
954-
// jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
955-
// Dispatcher d = nc.createDispatcher();
956-
// PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1));
957-
// JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so);
958-
// sub.pullExpiresIn(1, 30000);
959-
// jsm.deleteConsumer(tsc.stream, durable(1));
960-
// js.publish(tsc.subject(), null);
961-
// return sub;
962-
// });
963-
// }
950+
// This just flaps. It's a timing thing? Already spent too much time, IWOMM and it should work as is.
951+
@Test
952+
@Disabled
953+
public void testConsumerDeletedAsyncSub() throws Exception {
954+
testConflictStatus(409, CONSUMER_DELETED, TYPE_ERROR, "2.9.6", (nc, jsm, js, tsc, handler) -> {
955+
jsm.addOrUpdateConsumer(tsc.stream, builder().durable(durable(1)).ackPolicy(AckPolicy.None).build());
956+
Dispatcher d = nc.createDispatcher();
957+
PullSubscribeOptions so = PullSubscribeOptions.bind(tsc.stream, durable(1));
958+
JetStreamSubscription sub = js.subscribe(null, d, m -> {}, so);
959+
sub.pullExpiresIn(1, 30000);
960+
jsm.deleteConsumer(tsc.stream, durable(1));
961+
js.publish(tsc.subject(), null);
962+
return sub;
963+
});
964+
}
964965

965966
static class BadPullRequestOptions extends PullRequestOptions {
966967
public BadPullRequestOptions() {

src/test/java/io/nats/client/utils/TestBase.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.io.IOException;
2525
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Files;
2627
import java.time.Duration;
2728
import java.util.List;
2829
import java.util.concurrent.TimeUnit;
@@ -94,6 +95,10 @@ public interface TwoServerTest {
9495
void test(Connection nc1, Connection nc2) throws Exception;
9596
}
9697

98+
public interface ThreeServerTest {
99+
void test(Connection nc1, Connection nc2, Connection nc3) throws Exception;
100+
}
101+
97102
public interface VersionCheck {
98103
boolean runTest(ServerInfo si);
99104
}
@@ -319,6 +324,80 @@ public static void runInJsHubLeaf(TwoServerTest twoServerTest) throws Exception
319324
}
320325
}
321326

327+
public static void runInJsCluster(ThreeServerTest threeServerTest) throws Exception {
328+
int port1 = NatsTestServer.nextPort();
329+
int port2 = NatsTestServer.nextPort();
330+
int port3 = NatsTestServer.nextPort();
331+
int listen1 = NatsTestServer.nextPort();
332+
int listen2 = NatsTestServer.nextPort();
333+
int listen3 = NatsTestServer.nextPort();
334+
String path1 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
335+
String path2 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
336+
String path3 = Files.createTempDirectory(variant()).toString().replace("\\", "\\\\");
337+
String cluster = variant();
338+
String serverPrefix = variant();
339+
340+
String[] server1Inserts = new String[] {
341+
"jetstream {",
342+
" store_dir=" + path1,
343+
"}",
344+
"server_name=" + serverPrefix + "1",
345+
"cluster {",
346+
" name: " + cluster,
347+
" listen: 127.0.0.1:" + listen1,
348+
" routes: [",
349+
" nats-route://127.0.0.1:" + listen2,
350+
" nats-route://127.0.0.1:" + listen3,
351+
" ]",
352+
"}",
353+
};
354+
355+
String[] server2Inserts = new String[] {
356+
"jetstream {",
357+
" store_dir=" + path2,
358+
"}",
359+
"server_name=" + serverPrefix + "2",
360+
"cluster {",
361+
" name: " + cluster,
362+
" listen: 127.0.0.1:" + listen2,
363+
" routes: [",
364+
" nats-route://127.0.0.1:" + listen1,
365+
" nats-route://127.0.0.1:" + listen3,
366+
" ]",
367+
"}",
368+
};
369+
370+
String[] server3Inserts = new String[] {
371+
"jetstream {",
372+
" store_dir=" + path3,
373+
"}",
374+
"server_name=" + serverPrefix + "3",
375+
"cluster {",
376+
" name: " + cluster,
377+
" listen: 127.0.0.1:" + listen3,
378+
" routes: [",
379+
" nats-route://127.0.0.1:" + listen1,
380+
" nats-route://127.0.0.1:" + listen2,
381+
" ]",
382+
"}",
383+
};
384+
385+
try (NatsTestServer srv1 = new NatsTestServer(port1, false, true, null, server1Inserts, null);
386+
Connection nc1 = standardConnection(srv1.getURI());
387+
NatsTestServer srv2 = new NatsTestServer(port2, false, true, null, server2Inserts, null);
388+
Connection nc2 = standardConnection(srv2.getURI());
389+
NatsTestServer srv3 = new NatsTestServer(port3, false, true, null, server3Inserts, null);
390+
Connection nc3 = standardConnection(srv3.getURI())
391+
) {
392+
try {
393+
threeServerTest.test(nc1, nc2, nc3);
394+
}
395+
finally {
396+
cleanupJs(nc1);
397+
}
398+
}
399+
}
400+
322401
private static void cleanupJs(Connection c)
323402
{
324403
try {

0 commit comments

Comments
 (0)