-
Notifications
You must be signed in to change notification settings - Fork 172
Per Message TTL Support for 2.11 #1295
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,15 +45,17 @@ public class PublishOptions { | |
private final long expectedLastSeq; | ||
private final long expectedLastSubSeq; | ||
private final String msgId; | ||
private final int msgTtlSeconds; | ||
|
||
private PublishOptions(String stream, Duration streamTimeout, String expectedStream, String expectedLastId, long expectedLastSeq, long expectedLastSubSeq, String msgId) { | ||
this.stream = stream; | ||
this.streamTimeout = streamTimeout; | ||
this.expectedStream = expectedStream; | ||
this.expectedLastId = expectedLastId; | ||
this.expectedLastSeq = expectedLastSeq; | ||
this.expectedLastSubSeq = expectedLastSubSeq; | ||
this.msgId = msgId; | ||
private PublishOptions(Builder b) { | ||
this.stream = b.stream; | ||
this.streamTimeout = b.streamTimeout; | ||
this.expectedStream = b.expectedStream; | ||
this.expectedLastId = b.expectedLastId; | ||
this.expectedLastSeq = b.expectedLastSeq; | ||
this.expectedLastSubSeq = b.expectedLastSubSeq; | ||
this.msgId = b.msgId; | ||
this.msgTtlSeconds = b.msgTtlSeconds; | ||
} | ||
|
||
/** | ||
|
@@ -122,6 +124,14 @@ public String getMessageId() { | |
return this.msgId; | ||
} | ||
|
||
/** | ||
* Gets the message ttl in seconds. returns -1 if not set. | ||
* @return the message ttl or -1 | ||
*/ | ||
public int getMsgTtlSeconds() { | ||
return msgTtlSeconds; | ||
} | ||
|
||
/** | ||
* Creates a builder for the options. | ||
* @return the builder | ||
|
@@ -144,6 +154,7 @@ public static class Builder { | |
long expectedLastSeq = UNSET_LAST_SEQUENCE; | ||
long expectedLastSubSeq = UNSET_LAST_SEQUENCE; | ||
String msgId; | ||
int msgTtlSeconds = -1; | ||
|
||
/** | ||
* Constructs a new publish options Builder with the default values. | ||
|
@@ -169,7 +180,7 @@ public Builder(Properties properties) { | |
/** | ||
* Sets the stream name for publishing. The default is undefined. | ||
* @param stream The name of the stream. | ||
* @return Builder | ||
* @return The Builder | ||
*/ | ||
public Builder stream(String stream) { | ||
this.stream = validateStreamName(stream, false); | ||
|
@@ -180,7 +191,7 @@ public Builder stream(String stream) { | |
* Sets the timeout to wait for a publish acknowledgement from a JetStream | ||
* enabled NATS server. | ||
* @param timeout the publish timeout. | ||
* @return Builder | ||
* @return The Builder | ||
*/ | ||
public Builder streamTimeout(Duration timeout) { | ||
this.streamTimeout = validateDurationNotRequiredGtOrEqZero(timeout, DEFAULT_TIMEOUT); | ||
|
@@ -191,7 +202,7 @@ public Builder streamTimeout(Duration timeout) { | |
* Sets the expected stream for the publish. If the | ||
* stream does not match the server will not save the message. | ||
* @param stream expected stream | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder expectedStream(String stream) { | ||
expectedStream = validateStreamName(stream, false); | ||
|
@@ -202,7 +213,7 @@ public Builder expectedStream(String stream) { | |
* Sets the expected last ID of the previously published message. If the | ||
* message ID does not match the server will not save the message. | ||
* @param lastMsgId the stream | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder expectedLastMsgId(String lastMsgId) { | ||
expectedLastId = emptyAsNull(lastMsgId); | ||
|
@@ -212,7 +223,7 @@ public Builder expectedLastMsgId(String lastMsgId) { | |
/** | ||
* Sets the expected message sequence of the publish | ||
* @param sequence the expected last sequence number | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder expectedLastSequence(long sequence) { | ||
// 0 has NO meaning to expectedLastSequence but we except 0 b/c the sequence is really a ulong | ||
|
@@ -223,7 +234,7 @@ public Builder expectedLastSequence(long sequence) { | |
/** | ||
* Sets the expected subject message sequence of the publish | ||
* @param sequence the expected last subject sequence number | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder expectedLastSubjectSequence(long sequence) { | ||
expectedLastSubSeq = validateGtEqMinus1(sequence, "Last Subject Sequence"); | ||
|
@@ -234,17 +245,27 @@ public Builder expectedLastSubjectSequence(long sequence) { | |
* Sets the message id. Message IDs are used for de-duplication | ||
* and should be unique to each message payload. | ||
* @param msgId the unique message id. | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder messageId(String msgId) { | ||
this.msgId = emptyAsNull(msgId); | ||
return this; | ||
} | ||
|
||
/** | ||
* Sets the TTL for this specific message to be published | ||
* @param msgTtlSeconds the ttl in seconds | ||
* @return The Builder | ||
*/ | ||
public Builder msgTtlSeconds(int msgTtlSeconds) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
this.msgTtlSeconds = msgTtlSeconds < 1 ? -1 : msgTtlSeconds; | ||
return this; | ||
} | ||
|
||
/** | ||
* Clears the expected so the build can be re-used. | ||
* Clears the expectedLastId, expectedLastSequence and messageId fields. | ||
* @return builder | ||
* @return The Builder | ||
*/ | ||
public Builder clearExpected() { | ||
expectedLastId = null; | ||
|
@@ -259,7 +280,7 @@ public Builder clearExpected() { | |
* @return publish options | ||
*/ | ||
public PublishOptions build() { | ||
return new PublishOptions(stream, streamTimeout, expectedStream, expectedLastId, expectedLastSeq, expectedLastSubSeq, msgId); | ||
return new PublishOptions(this); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ public enum AckPolicy { | |
*/ | ||
Explicit("explicit"); | ||
|
||
private String policy; | ||
private final String policy; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I happened to be here and the IDE yelled at me, so I fixed it. |
||
|
||
AckPolicy(String p) { | ||
policy = p; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,42 +71,42 @@ public class StreamConfiguration implements JsonSerializable { | |
private final Duration subjectDeleteMarkerTtl; | ||
|
||
static StreamConfiguration instance(JsonValue v) { | ||
Builder builder = new Builder(); | ||
builder.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION))); | ||
builder.compressionOption(CompressionOption.get(readString(v, COMPRESSION))); | ||
builder.storageType(StorageType.get(readString(v, STORAGE))); | ||
builder.discardPolicy(DiscardPolicy.get(readString(v, DISCARD))); | ||
builder.name(readString(v, NAME)); | ||
builder.description(readString(v, DESCRIPTION)); | ||
builder.maxConsumers(readLong(v, MAX_CONSUMERS, -1)); | ||
builder.maxMessages(readLong(v, MAX_MSGS, -1)); | ||
builder.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1)); | ||
builder.maxBytes(readLong(v, MAX_BYTES, -1)); | ||
builder.maxAge(readNanos(v, MAX_AGE)); | ||
builder.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1)); | ||
builder.replicas(readInteger(v, NUM_REPLICAS, 1)); | ||
builder.noAck(readBoolean(v, NO_ACK)); | ||
builder.templateOwner(readString(v, TEMPLATE_OWNER)); | ||
builder.duplicateWindow(readNanos(v, DUPLICATE_WINDOW)); | ||
builder.subjects(readStringList(v, SUBJECTS)); | ||
builder.placement(Placement.optionalInstance(readValue(v, PLACEMENT))); | ||
builder.republish(Republish.optionalInstance(readValue(v, REPUBLISH))); | ||
builder.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM))); | ||
builder.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS))); | ||
builder.mirror(Mirror.optionalInstance(readValue(v, MIRROR))); | ||
builder.sources(Source.optionalListOf(readValue(v, SOURCES))); | ||
builder.sealed(readBoolean(v, SEALED)); | ||
builder.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS)); | ||
builder.allowDirect(readBoolean(v, ALLOW_DIRECT)); | ||
builder.mirrorDirect(readBoolean(v, MIRROR_DIRECT)); | ||
builder.denyDelete(readBoolean(v, DENY_DELETE)); | ||
builder.denyPurge(readBoolean(v, DENY_PURGE)); | ||
builder.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT)); | ||
builder.metadata(readStringStringMap(v, METADATA)); | ||
builder.firstSequence(readLong(v, FIRST_SEQ, 1)); | ||
// builder.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL)); | ||
// builder.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL)); | ||
return builder.build(); | ||
return new Builder() | ||
.retentionPolicy(RetentionPolicy.get(readString(v, RETENTION))) | ||
.compressionOption(CompressionOption.get(readString(v, COMPRESSION))) | ||
.storageType(StorageType.get(readString(v, STORAGE))) | ||
.discardPolicy(DiscardPolicy.get(readString(v, DISCARD))) | ||
.name(readString(v, NAME)) | ||
.description(readString(v, DESCRIPTION)) | ||
.maxConsumers(readLong(v, MAX_CONSUMERS, -1)) | ||
.maxMessages(readLong(v, MAX_MSGS, -1)) | ||
.maxMessagesPerSubject(readLong(v, MAX_MSGS_PER_SUB, -1)) | ||
.maxBytes(readLong(v, MAX_BYTES, -1)) | ||
.maxAge(readNanos(v, MAX_AGE)) | ||
.maximumMessageSize(readInteger(v, MAX_MSG_SIZE, -1)) | ||
.replicas(readInteger(v, NUM_REPLICAS, 1)) | ||
.noAck(readBoolean(v, NO_ACK)) | ||
.templateOwner(readString(v, TEMPLATE_OWNER)) | ||
.duplicateWindow(readNanos(v, DUPLICATE_WINDOW)) | ||
.subjects(readStringList(v, SUBJECTS)) | ||
.placement(Placement.optionalInstance(readValue(v, PLACEMENT))) | ||
.republish(Republish.optionalInstance(readValue(v, REPUBLISH))) | ||
.subjectTransform(SubjectTransform.optionalInstance(readValue(v, SUBJECT_TRANSFORM))) | ||
.consumerLimits(ConsumerLimits.optionalInstance(readValue(v, CONSUMER_LIMITS))) | ||
.mirror(Mirror.optionalInstance(readValue(v, MIRROR))) | ||
.sources(Source.optionalListOf(readValue(v, SOURCES))) | ||
.sealed(readBoolean(v, SEALED)) | ||
.allowRollup(readBoolean(v, ALLOW_ROLLUP_HDRS)) | ||
.allowDirect(readBoolean(v, ALLOW_DIRECT)) | ||
.mirrorDirect(readBoolean(v, MIRROR_DIRECT)) | ||
.denyDelete(readBoolean(v, DENY_DELETE)) | ||
.denyPurge(readBoolean(v, DENY_PURGE)) | ||
.discardNewPerSubject(readBoolean(v, DISCARD_NEW_PER_SUBJECT)) | ||
.metadata(readStringStringMap(v, METADATA)) | ||
.firstSequence(readLong(v, FIRST_SEQ, 1)) | ||
.allowMessageTtl(readBoolean(v, ALLOW_MSG_TTL)) | ||
.subjectDeleteMarkerTtl(readNanos(v, SUBJECT_DELETE_MARKER_TTL)) | ||
.build(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no sure why I didn't use the chain method before. This wasn't a necessary change. |
||
} | ||
|
||
// For the builder, assumes all validations are already done in builder | ||
|
@@ -1060,35 +1060,35 @@ public Builder firstSequence(long firstSeq) { | |
return this; | ||
} | ||
|
||
// /** | ||
// * Set to allow per message TTL to true | ||
// * @return The Builder | ||
// */ | ||
// public Builder allowMessageTtl() { | ||
// this.allowMessageTtl = true; | ||
// return this; | ||
// } | ||
|
||
// /** | ||
// * Set allow per message TTL flag | ||
// * @param allowMessageTtl the flag | ||
// * @return The Builder | ||
// */ | ||
// public Builder allowMessageTtl(boolean allowMessageTtl) { | ||
// this.allowMessageTtl = allowMessageTtl; | ||
// return this; | ||
// } | ||
|
||
// /** | ||
// * The time delete marker TTL duration. Server accepts 1 second or more. | ||
// * CLIENT DOES NOT VALIDATE | ||
// * @param subjectDeleteMarkerTtl the TTL duration | ||
// * @return The Builder | ||
// */ | ||
// public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) { | ||
// this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl; | ||
// return this; | ||
// } | ||
/** | ||
* Set to allow per message TTL to true | ||
* @return The Builder | ||
*/ | ||
public Builder allowMessageTtl() { | ||
this.allowMessageTtl = true; | ||
return this; | ||
} | ||
|
||
/** | ||
* Set allow per message TTL flag | ||
* @param allowMessageTtl the flag | ||
* @return The Builder | ||
*/ | ||
public Builder allowMessageTtl(boolean allowMessageTtl) { | ||
this.allowMessageTtl = allowMessageTtl; | ||
return this; | ||
} | ||
|
||
/** | ||
* The time delete marker TTL duration. Server accepts 1 second or more. | ||
* CLIENT DOES NOT VALIDATE | ||
* @param subjectDeleteMarkerTtl the TTL duration | ||
* @return The Builder | ||
*/ | ||
public Builder subjectDeleteMarkerTtl(Duration subjectDeleteMarkerTtl) { | ||
this.subjectDeleteMarkerTtl = subjectDeleteMarkerTtl; | ||
return this; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Coded, then commented out, then back in. |
||
|
||
/** | ||
* Builds the StreamConfiguration | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -198,6 +198,7 @@ private Headers mergePublishOptions(Headers headers, PublishOptions opts) { | |
merged = mergeString(merged, EXPECTED_LAST_MSG_ID_HDR, opts.getExpectedLastMsgId()); | ||
merged = mergeString(merged, EXPECTED_STREAM_HDR, opts.getExpectedStream()); | ||
merged = mergeString(merged, MSG_ID_HDR, opts.getMessageId()); | ||
merged = mergeNum(merged, MSG_TTL_HDR, opts.getMsgTtlSeconds()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mergeNum will add the header if the value is > -1 |
||
} | ||
|
||
return merged; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since private I can change. simplified constructor to take builder like most other places