Skip to content

Commit 090e75f

Browse files
authored
Merge pull request #1313 from nats-io/leadership-change
Better workflow for leadership change while simplified consuming.
2 parents c42c086 + c3a5f4e commit 090e75f

File tree

5 files changed

+15
-14
lines changed

5 files changed

+15
-14
lines changed

src/main/java/io/nats/client/impl/NatsJetStreamSubscription.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -130,19 +130,20 @@ protected Message _nextUnmanagedNoWait(String expectedPullSubject) throws Interr
130130
case MESSAGE:
131131
return msg;
132132
case STATUS_TERMINUS:
133-
// if the status applies return null, otherwise it's ignored, fall through
133+
// if the status applies, return null, otherwise it's ignored, fall through
134134
if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
135135
return null;
136136
}
137137
break;
138138
case STATUS_ERROR:
139-
// if the status applies throw exception, otherwise it's ignored, fall through
139+
// if the status applies, throw exception, otherwise it's ignored, fall through
140140
if (expectedPullSubject == null || expectedPullSubject.equals(msg.getSubject())) {
141141
throw new JetStreamStatusException(msg.getStatus(), this);
142142
}
143143
break;
144144
}
145-
// Check again when, regular messages might have arrived
145+
// These statuses don't apply to the message that came in,
146+
// so we just loop and move on to the next message.
146147
// 1. Any STATUS_HANDLED
147148
// 2. STATUS_TERMINUS or STATUS_ERRORS that aren't for expected pullSubject
148149
}

src/main/java/io/nats/client/impl/OrderedMessageManager.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected void startup(NatsJetStreamSubscription sub) {
5353
@Override
5454
protected ManageResult manage(Message msg) {
5555
if (!msg.getSID().equals(targetSid.get())) {
56-
return STATUS_HANDLED; // wrong sid is throwaway from previous consumer that errored
56+
return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
5757
}
5858

5959
if (msg.isJetStream()) {
@@ -90,14 +90,14 @@ private void handleErrorCondition() {
9090
}
9191
catch (Exception ignore) {}
9292

93-
// 2. re-subscribe. This means kill the sub then make a new one
93+
// 2. re-subscribe. This means killing the sub then making a new one.
9494
// New sub needs a new deliverSubject
9595
String newDeliverSubject = sub.connection.createInbox();
9696
sub.reSubscribe(newDeliverSubject);
9797
targetSid.set(sub.getSID());
9898

99-
// 3. make a new consumer using the same deliver subject but
100-
// with a new starting point
99+
// 3. make a new consumer using the same "deliver" subject
100+
// but with a new starting point
101101
ConsumerConfiguration userCC = js.consumerConfigurationForOrdered(originalCc, lastStreamSeq, newDeliverSubject, actualConsumerName, null);
102102
ConsumerInfo ci = js._createConsumer(stream, userCC, ConsumerCreateRequest.Action.Create); // this can fail when a server is down.
103103
sub.setConsumerName(ci.getName());

src/main/java/io/nats/client/impl/PullMessageManager.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -162,18 +162,17 @@ protected ManageResult manageStatus(Message msg) {
162162
case CONFLICT_CODE:
163163
// sometimes just a warning
164164
String statMsg = status.getMessage();
165-
if (statMsg.startsWith("Exceeded Max")
166-
|| statMsg.equals(SERVER_SHUTDOWN)
167-
|| statMsg.equals(LEADERSHIP_CHANGE)
168-
) {
165+
if (statMsg.startsWith(EXCEEDED_MAX_PREFIX) || statMsg.equals(SERVER_SHUTDOWN))
166+
{
169167
if (raiseStatusWarnings) {
170168
conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status));
171169
}
172170
return STATUS_HANDLED;
173171
}
174172

175-
if (statMsg.equals(BATCH_COMPLETED) ||
176-
statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
173+
if (statMsg.equals(BATCH_COMPLETED)
174+
|| statMsg.equals(LEADERSHIP_CHANGE)
175+
|| statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES))
177176
{
178177
return STATUS_TERMINUS;
179178
}

src/main/java/io/nats/client/impl/PullOrderedMessageManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ protected void startup(NatsJetStreamSubscription sub) {
5151
@Override
5252
protected ManageResult manage(Message msg) {
5353
if (!msg.getSID().equals(targetSid.get())) {
54-
return STATUS_HANDLED; // wrong sid is throwaway from previous consumer that errored
54+
return STATUS_HANDLED; // wrong sid. message is a throwaway from previous consumer that errored
5555
}
5656

5757
if (msg.isJetStream()) {

src/main/java/io/nats/client/support/Status.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class Status {
3737
public static String CONSUMER_IS_PUSH_BASED = "Consumer is push based"; // 409
3838

3939
public static String MESSAGE_SIZE_EXCEEDS_MAX_BYTES = "Message Size Exceeds MaxBytes"; // 409
40+
public static String EXCEEDED_MAX_PREFIX = "Exceeded Max";
4041
public static String EXCEEDED_MAX_WAITING = "Exceeded MaxWaiting"; // 409
4142
public static String EXCEEDED_MAX_REQUEST_BATCH = "Exceeded MaxRequestBatch"; // 409
4243
public static String EXCEEDED_MAX_REQUEST_EXPIRES = "Exceeded MaxRequestExpires"; // 409

0 commit comments

Comments
 (0)