Skip to content

Commit affe6be

Browse files
authored
Serializable Wrappers (#1156)
1 parent b4c91ef commit affe6be

14 files changed

+593
-134
lines changed

src/main/java/io/nats/client/BaseConsumeOptions.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,22 @@
1414
package io.nats.client;
1515

1616
import io.nats.client.api.ConsumerConfiguration;
17+
import io.nats.client.support.JsonParseException;
18+
import io.nats.client.support.JsonParser;
19+
import io.nats.client.support.JsonSerializable;
20+
import io.nats.client.support.JsonValue;
21+
22+
import static io.nats.client.support.ApiConstants.*;
23+
import static io.nats.client.support.JsonUtils.*;
24+
import static io.nats.client.support.JsonValueUtils.readBoolean;
25+
import static io.nats.client.support.JsonValueUtils.readInteger;
26+
import static io.nats.client.support.JsonValueUtils.readLong;
1727

1828
/**
1929
* Base Consume Options are provided to customize the way the consume and
2030
* fetch operate. It is the base class for ConsumeOptions and FetchConsumeOptions.
2131
*/
22-
public class BaseConsumeOptions {
32+
public class BaseConsumeOptions implements JsonSerializable {
2333
public static final int DEFAULT_MESSAGE_COUNT = 500;
2434
public static final int DEFAULT_MESSAGE_COUNT_WHEN_BYTES = 1_000_000;
2535
public static final int DEFAULT_THRESHOLD_PERCENT = 25;
@@ -47,13 +57,34 @@ protected BaseConsumeOptions(Builder b) {
4757

4858
// validation handled in builder
4959
thresholdPercent = b.thresholdPercent;
50-
expiresIn = b.expiresIn;
5160
noWait = b.noWait;
5261

62+
// if it's not noWait, it must have an expiresIn
63+
// we can't check this in the builder because we can't guarantee order
64+
// so we always default to LONG_UNSET in the builder and check it here.
65+
if (b.expiresIn == ConsumerConfiguration.LONG_UNSET && !noWait) {
66+
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
67+
}
68+
else {
69+
expiresIn = b.expiresIn;
70+
}
71+
5372
// calculated
5473
idleHeartbeat = Math.min(MAX_HEARTBEAT_MILLIS, expiresIn * MAX_IDLE_HEARTBEAT_PERCENT / 100);
5574
}
5675

76+
@Override
77+
public String toJson() {
78+
StringBuilder sb = beginJson();
79+
addField(sb, MESSAGES, messages);
80+
addField(sb, BYTES, bytes);
81+
addField(sb, EXPIRES_IN, expiresIn);
82+
addField(sb, IDLE_HEARTBEAT, idleHeartbeat);
83+
addField(sb, THRESHOLD_PERCENT, thresholdPercent);
84+
addFldWhenTrue(sb, NO_WAIT, noWait);
85+
return endJson(sb).toString();
86+
}
87+
5788
public long getExpiresInMillis() {
5889
return expiresIn;
5990
}
@@ -66,6 +97,10 @@ public int getThresholdPercent() {
6697
return thresholdPercent;
6798
}
6899

100+
public boolean isNoWait() {
101+
return noWait;
102+
}
103+
69104
protected static abstract class Builder<B, CO> {
70105
protected int messages = -1;
71106
protected long bytes = 0;
@@ -75,6 +110,28 @@ protected static abstract class Builder<B, CO> {
75110

76111
protected abstract B getThis();
77112

113+
protected B noWait() {
114+
return getThis();
115+
}
116+
117+
public B json(String json) throws JsonParseException {
118+
return jsonValue(JsonParser.parse(json));
119+
}
120+
121+
/**
122+
* Construct the builder and initialize values from the JsonValue object.
123+
*/
124+
public B jsonValue(JsonValue v) {
125+
messages(readInteger(v, MESSAGES, -1));
126+
bytes(readLong(v, BYTES, -1));
127+
expiresIn(readLong(v, EXPIRES_IN, MIN_EXPIRES_MILLS));
128+
thresholdPercent(readInteger(v, THRESHOLD_PERCENT, -1));
129+
if (readBoolean(v, NO_WAIT, false)) {
130+
noWait();
131+
}
132+
return getThis();
133+
}
134+
78135
protected B messages(int messages) {
79136
this.messages = messages < 1 ? -1 : messages;
80137
return getThis();
@@ -96,12 +153,7 @@ protected B bytes(long bytes) {
96153
*/
97154
public B expiresIn(long expiresInMillis) {
98155
if (expiresInMillis < 1) { // this is way to clear or reset, just a code guard really
99-
if (noWait) {
100-
expiresIn = ConsumerConfiguration.LONG_UNSET;
101-
}
102-
else {
103-
expiresIn = DEFAULT_EXPIRES_IN_MILLIS;
104-
}
156+
expiresIn = ConsumerConfiguration.LONG_UNSET;
105157
}
106158
else if (expiresInMillis < MIN_EXPIRES_MILLS) {
107159
throw new IllegalArgumentException("Expires must be greater than or equal to " + MIN_EXPIRES_MILLS);

src/main/java/io/nats/client/FetchConsumeOptions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ public long getMaxBytes() {
4141
return bytes;
4242
}
4343

44-
public boolean isNoWait() { return noWait; }
45-
4644
public static Builder builder() {
4745
return new Builder();
4846
}
@@ -100,6 +98,7 @@ public Builder max(int maxBytes, int maxMessages) {
10098
* Set no wait to true
10199
* @return the builder
102100
*/
101+
@Override
103102
public Builder noWait() {
104103
this.noWait = true;
105104
expiresIn = ConsumerConfiguration.LONG_UNSET;

src/main/java/io/nats/client/api/ConsumerConfiguration.java

Lines changed: 83 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@
1515

1616
import io.nats.client.PullSubscribeOptions;
1717
import io.nats.client.PushSubscribeOptions;
18-
import io.nats.client.support.ApiConstants;
19-
import io.nats.client.support.JsonSerializable;
20-
import io.nats.client.support.JsonUtils;
21-
import io.nats.client.support.JsonValue;
18+
import io.nats.client.support.*;
2219

2320
import java.time.Duration;
2421
import java.time.ZonedDateTime;
@@ -120,49 +117,6 @@ protected ConsumerConfiguration(ConsumerConfiguration cc) {
120117
this.filterSubjects = cc.filterSubjects == null ? null : new ArrayList<>(cc.filterSubjects);
121118
}
122119

123-
ConsumerConfiguration(JsonValue v) {
124-
deliverPolicy = DeliverPolicy.get(readString(v, DELIVER_POLICY));
125-
ackPolicy = AckPolicy.get(readString(v, ACK_POLICY));
126-
replayPolicy = ReplayPolicy.get(readString(v, REPLAY_POLICY));
127-
128-
description = readString(v, DESCRIPTION);
129-
durable = readString(v, DURABLE_NAME);
130-
name = readString(v, NAME);
131-
deliverSubject = readString(v, DELIVER_SUBJECT);
132-
deliverGroup = readString(v, DELIVER_GROUP);
133-
sampleFrequency = readString(v, SAMPLE_FREQ);
134-
startTime = readDate(v, OPT_START_TIME);
135-
ackWait = readNanos(v, ACK_WAIT);
136-
idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
137-
maxExpires = readNanos(v, MAX_EXPIRES);
138-
inactiveThreshold = readNanos(v, INACTIVE_THRESHOLD);
139-
140-
startSeq = readLong(v, OPT_START_SEQ);
141-
maxDeliver = readInteger(v, MAX_DELIVER);
142-
rateLimit = readLong(v, RATE_LIMIT_BPS);
143-
maxAckPending = readInteger(v, MAX_ACK_PENDING);
144-
maxPullWaiting = readInteger(v, MAX_WAITING);
145-
maxBatch = readInteger(v, MAX_BATCH);
146-
maxBytes = readInteger(v, MAX_BYTES);
147-
numReplicas = readInteger(v, NUM_REPLICAS);
148-
pauseUntil = readDate(v, PAUSE_UNTIL);
149-
150-
flowControl = readBoolean(v, FLOW_CONTROL, null);
151-
headersOnly = readBoolean(v, HEADERS_ONLY, null);
152-
memStorage = readBoolean(v, MEM_STORAGE, null);
153-
154-
backoff = readNanosList(v, BACKOFF, true);
155-
metadata = readStringStringMap(v, METADATA);
156-
157-
String tempFs = emptyAsNull(readString(v, FILTER_SUBJECT));
158-
if (tempFs == null) {
159-
filterSubjects = readOptionalStringList(v, FILTER_SUBJECTS);
160-
}
161-
else {
162-
filterSubjects = Collections.singletonList(tempFs);
163-
}
164-
}
165-
166120
// For the builder
167121
protected ConsumerConfiguration(Builder b)
168122
{
@@ -203,7 +157,6 @@ protected ConsumerConfiguration(Builder b)
203157

204158
/**
205159
* Returns a JSON representation of this consumer configuration.
206-
*
207160
* @return json consumer configuration json string
208161
*/
209162
public String toJson() {
@@ -686,8 +639,14 @@ public static class Builder {
686639
private Map<String, String> metadata;
687640
private List<String> filterSubjects;
688641

642+
/**
643+
* Construct the builder
644+
*/
689645
public Builder() {}
690646

647+
/**
648+
* Construct the builder and initialize values with the existing ConsumerConfiguration
649+
*/
691650
public Builder(ConsumerConfiguration cc) {
692651
if (cc != null) {
693652
this.deliverPolicy = cc.deliverPolicy;
@@ -732,6 +691,81 @@ public Builder(ConsumerConfiguration cc) {
732691
}
733692
}
734693

694+
/**
695+
* Construct the builder and initialize values from the json string.
696+
*/
697+
public Builder json(String json) throws JsonParseException {
698+
return jsonValue(JsonParser.parse(json));
699+
}
700+
701+
/**
702+
* Construct the builder and initialize values from the JsonValue object.
703+
*/
704+
public Builder jsonValue(JsonValue v) {
705+
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
706+
ackPolicy(AckPolicy.get(readString(v, ACK_POLICY)));
707+
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));
708+
709+
description(readString(v, DESCRIPTION));
710+
durable(readString(v, DURABLE_NAME));
711+
name(readString(v, NAME));
712+
deliverSubject(readString(v, DELIVER_SUBJECT));
713+
deliverGroup(readString(v, DELIVER_GROUP));
714+
sampleFrequency(readString(v, SAMPLE_FREQ));
715+
startTime(readDate(v, OPT_START_TIME));
716+
ackWait(readNanos(v, ACK_WAIT));
717+
maxExpires(readNanos(v, MAX_EXPIRES));
718+
inactiveThreshold(readNanos(v, INACTIVE_THRESHOLD));
719+
720+
startSequence(readLong(v, OPT_START_SEQ));
721+
maxDeliver(readLong(v, MAX_DELIVER, INTEGER_UNSET));
722+
rateLimit(readLong(v, RATE_LIMIT_BPS));
723+
maxAckPending(readLong(v, MAX_ACK_PENDING));
724+
maxPullWaiting(readLong(v, MAX_WAITING));
725+
maxBatch(readLong(v, MAX_BATCH));
726+
maxBytes(readLong(v, MAX_BYTES));
727+
728+
Integer r = readInteger(v, NUM_REPLICAS);
729+
if (r != null) {
730+
if (r == 0) {
731+
numReplicas = 0;
732+
}
733+
else {
734+
numReplicas(r);
735+
}
736+
}
737+
738+
pauseUntil(readDate(v, PAUSE_UNTIL));
739+
740+
Duration idleHeartbeat = readNanos(v, IDLE_HEARTBEAT);
741+
if (idleHeartbeat != null) {
742+
if (readBoolean(v, FLOW_CONTROL, false)) {
743+
flowControl(idleHeartbeat);
744+
}
745+
else {
746+
idleHeartbeat(idleHeartbeat);
747+
}
748+
}
749+
750+
headersOnly(readBoolean(v, HEADERS_ONLY, null));
751+
memStorage(readBoolean(v, MEM_STORAGE, null));
752+
753+
//noinspection DataFlowIssue readNanosList with false ensures not null;
754+
backoff(readNanosList(v, BACKOFF, false).toArray(new Duration[0]));
755+
756+
metadata(readStringStringMap(v, METADATA));
757+
758+
String fs = emptyAsNull(readString(v, FILTER_SUBJECT));
759+
if (fs == null) {
760+
filterSubjects(readOptionalStringList(v, FILTER_SUBJECTS));
761+
}
762+
else {
763+
filterSubject(fs);
764+
}
765+
766+
return this;
767+
}
768+
735769
/**
736770
* Sets the description
737771
* @param description the description
@@ -1245,7 +1279,7 @@ public Builder backoff(long... backoffsMillis) {
12451279
* @return Builder
12461280
*/
12471281
public Builder metadata(Map<String, String> metadata) {
1248-
this.metadata = metadata == null || metadata.size() == 0 ? null : new HashMap<>(metadata);
1282+
this.metadata = metadata == null || metadata.isEmpty() ? null : new HashMap<>(metadata);
12491283
return this;
12501284
}
12511285

src/main/java/io/nats/client/api/ConsumerInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public ConsumerInfo(Message msg) {
4949

5050
public ConsumerInfo(JsonValue vConsumerInfo) {
5151
super(vConsumerInfo);
52-
this.configuration = new ConsumerConfiguration(readObject(jv, CONFIG));
52+
this.configuration = ConsumerConfiguration.builder().jsonValue(readObject(jv, CONFIG)).build();
5353

5454
stream = readString(jv, STREAM_NAME);
5555
name = readString(jv, NAME);

src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,21 @@
1313

1414
package io.nats.client.api;
1515

16+
import io.nats.client.support.*;
17+
1618
import java.time.ZonedDateTime;
1719
import java.util.ArrayList;
1820
import java.util.Arrays;
1921
import java.util.Collections;
2022
import java.util.List;
2123

24+
import static io.nats.client.support.ApiConstants.*;
25+
import static io.nats.client.support.JsonUtils.beginJson;
26+
import static io.nats.client.support.JsonUtils.endJson;
27+
import static io.nats.client.support.JsonValueUtils.*;
2228
import static io.nats.client.support.Validator.emptyAsNull;
2329

24-
public class OrderedConsumerConfiguration {
30+
public class OrderedConsumerConfiguration implements JsonSerializable {
2531

2632
public static String DEFAULT_FILTER_SUBJECT = ">";
2733

@@ -43,6 +49,44 @@ public OrderedConsumerConfiguration() {
4349
filterSubjects.add(DEFAULT_FILTER_SUBJECT);
4450
}
4551

52+
public OrderedConsumerConfiguration(String json) throws JsonParseException {
53+
this(JsonParser.parse(json));
54+
}
55+
56+
public OrderedConsumerConfiguration(JsonValue v) throws JsonParseException {
57+
this();
58+
List<String> list = readOptionalStringList(v, FILTER_SUBJECTS);
59+
if (list != null) {
60+
filterSubjects(list);
61+
}
62+
deliverPolicy(DeliverPolicy.get(readString(v, DELIVER_POLICY)));
63+
startSequence(readLong(v, OPT_START_SEQ, ConsumerConfiguration.LONG_UNSET));
64+
startTime(readDate(v, OPT_START_TIME));
65+
replayPolicy(ReplayPolicy.get(readString(v, REPLAY_POLICY)));
66+
headersOnly(readBoolean(v, HEADERS_ONLY, null));
67+
}
68+
69+
/**
70+
* Returns a JSON representation of this ordered consumer configuration.
71+
* @return json ordered consumer configuration json string
72+
*/
73+
public String toJson() {
74+
StringBuilder sb = beginJson();
75+
if (filterSubjects != null && !filterSubjects.isEmpty()) {
76+
JsonUtils.addStrings(sb, FILTER_SUBJECTS, filterSubjects);
77+
}
78+
if (deliverPolicy != null) {
79+
JsonUtils.addField(sb, DELIVER_POLICY, deliverPolicy.toString());
80+
}
81+
JsonUtils.addFieldWhenGtZero(sb, OPT_START_SEQ, startSequence);
82+
JsonUtils.addField(sb, OPT_START_TIME, startTime);
83+
if (replayPolicy != null) {
84+
JsonUtils.addField(sb, REPLAY_POLICY, replayPolicy.toString());
85+
}
86+
JsonUtils.addFldWhenTrue(sb, HEADERS_ONLY, headersOnly);
87+
return endJson(sb).toString();
88+
}
89+
4690
/**
4791
* Sets the filter subject of the OrderedConsumerConfiguration.
4892
* @param filterSubject the filter subject

src/main/java/io/nats/client/support/ApiConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public interface ApiConstants {
6969
String ERROR = "error";
7070
String ERRORS = "errors";
7171
String EXPIRES = "expires";
72+
String EXPIRES_IN = "expires_in";
7273
String EXTERNAL = "external";
7374
String FILTER = "filter";
7475
String FILTER_SUBJECT = "filter_subject";
@@ -191,6 +192,7 @@ public interface ApiConstants {
191192
String SUCCESS = "success";
192193
String TAGS = "tags";
193194
String TEMPLATE_OWNER = "template_owner";
195+
String THRESHOLD_PERCENT = "threshold_percent";
194196
String TIERS = "tiers";
195197
String TIME = "time";
196198
String TIMESTAMP = "ts";

0 commit comments

Comments
 (0)