Skip to content

Commit 4aab962

Browse files
authored
Revert headers from being read-only upon message creation (#1123)
1 parent 20cbd5c commit 4aab962

13 files changed

+297
-148
lines changed

src/main/java/io/nats/client/impl/Headers.java

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,13 @@ public Headers(Headers headers, boolean readOnly, String[] keysNotToCopy) {
9292
* -or- if any value contains invalid characters
9393
*/
9494
public Headers add(String key, String... values) {
95-
if (values != null) {
96-
_add(key, Arrays.asList(values));
95+
if (readOnly) {
96+
throw new UnsupportedOperationException();
9797
}
98-
return this;
98+
if (values == null || values.length == 0) {
99+
return this;
100+
}
101+
return _add(key, Arrays.asList(values));
99102
}
100103

101104
/**
@@ -109,12 +112,17 @@ public Headers add(String key, String... values) {
109112
* -or- if any value contains invalid characters
110113
*/
111114
public Headers add(String key, Collection<String> values) {
112-
_add(key, values);
113-
return this;
115+
if (readOnly) {
116+
throw new UnsupportedOperationException();
117+
}
118+
if (values == null || values.isEmpty()) {
119+
return this;
120+
}
121+
return _add(key, values);
114122
}
115123

116124
// the add delegate
117-
private void _add(String key, Collection<String> values) {
125+
private Headers _add(String key, Collection<String> values) {
118126
if (values != null) {
119127
Checker checked = new Checker(key, values);
120128
if (checked.hasValues()) {
@@ -129,6 +137,7 @@ private void _add(String key, Collection<String> values) {
129137
serialized = null; // since the data changed, clear this so it's rebuilt
130138
}
131139
}
140+
return this;
132141
}
133142

134143
/**
@@ -143,10 +152,13 @@ private void _add(String key, Collection<String> values) {
143152
* -or- if any value contains invalid characters
144153
*/
145154
public Headers put(String key, String... values) {
146-
if (values != null) {
147-
_put(key, Arrays.asList(values));
155+
if (readOnly) {
156+
throw new UnsupportedOperationException();
148157
}
149-
return this;
158+
if (values == null || values.length == 0) {
159+
return this;
160+
}
161+
return _put(key, Arrays.asList(values));
150162
}
151163

152164
/**
@@ -161,8 +173,13 @@ public Headers put(String key, String... values) {
161173
* -or- if any value contains invalid characters
162174
*/
163175
public Headers put(String key, Collection<String> values) {
164-
_put(key, values);
165-
return this;
176+
if (readOnly) {
177+
throw new UnsupportedOperationException();
178+
}
179+
if (values == null || values.isEmpty()) {
180+
return this;
181+
}
182+
return _put(key, values);
166183
}
167184

168185
/**
@@ -173,14 +190,20 @@ public Headers put(String key, Collection<String> values) {
173190
* @return the Headers object
174191
*/
175192
public Headers put(Map<String, List<String>> map) {
193+
if (readOnly) {
194+
throw new UnsupportedOperationException();
195+
}
196+
if (map == null || map.isEmpty()) {
197+
return this;
198+
}
176199
for (String key : map.keySet() ) {
177-
put(key, map.get(key));
200+
_put(key, map.get(key));
178201
}
179202
return this;
180203
}
181204

182-
// the put delegate that all puts call
183-
private void _put(String key, Collection<String> values) {
205+
// the put delegate
206+
private Headers _put(String key, Collection<String> values) {
184207
if (key == null || key.isEmpty()) {
185208
throw new IllegalArgumentException("Key cannot be null or empty.");
186209
}
@@ -195,6 +218,7 @@ private void _put(String key, Collection<String> values) {
195218
serialized = null; // since the data changed, clear this so it's rebuilt
196219
}
197220
}
221+
return this;
198222
}
199223

200224
/**
@@ -203,6 +227,9 @@ private void _put(String key, Collection<String> values) {
203227
* @param keys the key or keys to remove
204228
*/
205229
public void remove(String... keys) {
230+
if (readOnly) {
231+
throw new UnsupportedOperationException();
232+
}
206233
for (String key : keys) {
207234
_remove(key);
208235
}
@@ -215,12 +242,16 @@ public void remove(String... keys) {
215242
* @param keys the key or keys to remove
216243
*/
217244
public void remove(Collection<String> keys) {
245+
if (readOnly) {
246+
throw new UnsupportedOperationException();
247+
}
218248
for (String key : keys) {
219249
_remove(key);
220250
}
221251
serialized = null; // since the data changed, clear this so it's rebuilt
222252
}
223253

254+
// the remove delegate
224255
private void _remove(String key) {
225256
// if the values had a key, then the data length had a length
226257
if (valuesMap.remove(key) != null) {
@@ -250,6 +281,9 @@ public boolean isEmpty() {
250281
* Removes all the keys The object map will be empty after this call returns.
251282
*/
252283
public void clear() {
284+
if (readOnly) {
285+
throw new UnsupportedOperationException();
286+
}
253287
valuesMap.clear();
254288
lengthMap.clear();
255289
dataLength = 0;

src/main/java/io/nats/client/impl/IncomingMessage.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,27 @@
1313

1414
package io.nats.client.impl;
1515

16+
import io.nats.client.support.ByteArrayBuilder;
17+
1618
public class IncomingMessage extends NatsMessage {
17-
IncomingMessage() {}
19+
IncomingMessage() {
20+
super((byte[])null);
21+
}
1822

1923
IncomingMessage(byte[] data) {
2024
super(data);
2125
}
2226

27+
@Override
28+
protected void calculate() {
29+
// intentionally does nothing
30+
}
31+
32+
@Override
33+
ByteArrayBuilder getProtocolBab() {
34+
throw new IllegalStateException("getProtocolBab not supported for this type of message.");
35+
}
36+
2337
@Override
2438
byte[] getProtocolBytes() {
2539
throw new IllegalStateException("getProtocolBytes not supported for this type of message.");

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -836,31 +836,31 @@ void cleanUpPongQueue() {
836836
*/
837837
@Override
838838
public void publish(String subject, byte[] body) {
839-
publishInternal(subject, null, null, body);
839+
publishInternal(subject, null, null, body, true);
840840
}
841841

842842
/**
843843
* {@inheritDoc}
844844
*/
845845
@Override
846846
public void publish(String subject, Headers headers, byte[] body) {
847-
publishInternal(subject, null, headers, body);
847+
publishInternal(subject, null, headers, body, true);
848848
}
849849

850850
/**
851851
* {@inheritDoc}
852852
*/
853853
@Override
854854
public void publish(String subject, String replyTo, byte[] body) {
855-
publishInternal(subject, replyTo, null, body);
855+
publishInternal(subject, replyTo, null, body, true);
856856
}
857857

858858
/**
859859
* {@inheritDoc}
860860
*/
861861
@Override
862862
public void publish(String subject, String replyTo, Headers headers, byte[] body) {
863-
publishInternal(subject, replyTo, headers, body);
863+
publishInternal(subject, replyTo, headers, body, true);
864864
}
865865

866866
/**
@@ -869,35 +869,30 @@ public void publish(String subject, String replyTo, Headers headers, byte[] body
869869
@Override
870870
public void publish(Message message) {
871871
validateNotNull(message, "Message");
872-
publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData());
872+
publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false);
873873
}
874874

875-
void publishInternal(String subject, String replyTo, Headers headers, byte[] data) {
876-
checkIfNeedsHeaderSupport(headers);
875+
void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubRep) {
877876
checkPayloadSize(data);
877+
NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubRep);
878+
if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
879+
throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
880+
}
878881

879882
if (isClosed()) {
880883
throw new IllegalStateException("Connection is Closed");
881884
} else if (blockPublishForDrain.get()) {
882885
throw new IllegalStateException("Connection is Draining"); // Ok to publish while waiting on subs
883886
}
884887

885-
NatsMessage nm = new NatsMessage(subject, replyTo, new Headers(headers), data);
886-
887888
Connection.Status stat = this.status;
888889
if ((stat == Status.RECONNECTING || stat == Status.DISCONNECTED)
889-
&& !this.writer.canQueueDuringReconnect(nm)) {
890+
&& !this.writer.canQueueDuringReconnect(npm)) {
890891
throw new IllegalStateException(
891892
"Unable to queue any more messages during reconnect, max buffer is " + options.getReconnectBufferSize());
892893
}
893-
queueOutgoing(nm);
894-
}
895894

896-
private void checkIfNeedsHeaderSupport(Headers headers) {
897-
if (headers != null && !headers.isEmpty() && !serverInfo.get().isHeadersSupported()) {
898-
throw new IllegalArgumentException(
899-
"Headers are not supported by the server, version: " + serverInfo.get().getVersion());
900-
}
895+
queueOutgoing(npm);
901896
}
902897

903898
private void checkPayloadSize(byte[] body) {
@@ -1099,15 +1094,15 @@ else if (future.isDone()) {
10991094
*/
11001095
@Override
11011096
public Message request(String subject, byte[] body, Duration timeout) throws InterruptedException {
1102-
return requestInternal(subject, null, body, timeout, cancelAction);
1097+
return requestInternal(subject, null, body, timeout, cancelAction, true);
11031098
}
11041099

11051100
/**
11061101
* {@inheritDoc}
11071102
*/
11081103
@Override
11091104
public Message request(String subject, Headers headers, byte[] body, Duration timeout) throws InterruptedException {
1110-
return requestInternal(subject, headers, body, timeout, cancelAction);
1105+
return requestInternal(subject, headers, body, timeout, cancelAction, true);
11111106
}
11121107

11131108
/**
@@ -1116,11 +1111,11 @@ public Message request(String subject, Headers headers, byte[] body, Duration ti
11161111
@Override
11171112
public Message request(Message message, Duration timeout) throws InterruptedException {
11181113
validateNotNull(message, "Message");
1119-
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction);
1114+
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
11201115
}
11211116

1122-
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction) throws InterruptedException {
1123-
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction);
1117+
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubRep) throws InterruptedException {
1118+
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubRep);
11241119
try {
11251120
return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
11261121
} catch (TimeoutException | ExecutionException | CancellationException e) {
@@ -1133,31 +1128,31 @@ Message requestInternal(String subject, Headers headers, byte[] data, Duration t
11331128
*/
11341129
@Override
11351130
public CompletableFuture<Message> request(String subject, byte[] body) {
1136-
return requestFutureInternal(subject, null, body, null, cancelAction);
1131+
return requestFutureInternal(subject, null, body, null, cancelAction, true);
11371132
}
11381133

11391134
/**
11401135
* {@inheritDoc}
11411136
*/
11421137
@Override
11431138
public CompletableFuture<Message> request(String subject, Headers headers, byte[] body) {
1144-
return requestFutureInternal(subject, headers, body, null, cancelAction);
1139+
return requestFutureInternal(subject, headers, body, null, cancelAction, true);
11451140
}
11461141

11471142
/**
11481143
* {@inheritDoc}
11491144
*/
11501145
@Override
11511146
public CompletableFuture<Message> requestWithTimeout(String subject, byte[] body, Duration timeout) {
1152-
return requestFutureInternal(subject, null, body, timeout, cancelAction);
1147+
return requestFutureInternal(subject, null, body, timeout, cancelAction, true);
11531148
}
11541149

11551150
/**
11561151
* {@inheritDoc}
11571152
*/
11581153
@Override
11591154
public CompletableFuture<Message> requestWithTimeout(String subject, Headers headers, byte[] body, Duration timeout) {
1160-
return requestFutureInternal(subject, headers, body, timeout, cancelAction);
1155+
return requestFutureInternal(subject, headers, body, timeout, cancelAction, true);
11611156
}
11621157

11631158
/**
@@ -1166,7 +1161,7 @@ public CompletableFuture<Message> requestWithTimeout(String subject, Headers hea
11661161
@Override
11671162
public CompletableFuture<Message> requestWithTimeout(Message message, Duration timeout) {
11681163
validateNotNull(message, "Message");
1169-
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction);
1164+
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
11701165
}
11711166

11721167
/**
@@ -1175,10 +1170,10 @@ public CompletableFuture<Message> requestWithTimeout(Message message, Duration t
11751170
@Override
11761171
public CompletableFuture<Message> request(Message message) {
11771172
validateNotNull(message, "Message");
1178-
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction);
1173+
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false);
11791174
}
11801175

1181-
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction) {
1176+
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubRep) {
11821177
checkPayloadSize(data);
11831178

11841179
if (isClosed()) {
@@ -1230,7 +1225,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
12301225
responsesAwaiting.put(sub.getSID(), future);
12311226
}
12321227

1233-
publishInternal(subject, responseInbox, headers, data);
1228+
publishInternal(subject, responseInbox, headers, data, validateSubRep);
12341229
writer.flushBuffer();
12351230
statistics.incrementRequestsSent();
12361231

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import io.nats.client.Options;
1717
import io.nats.client.StatisticsCollector;
18+
import io.nats.client.support.ByteArrayBuilder;
1819

1920
import java.io.IOException;
2021
import java.nio.BufferOverflowException;
@@ -125,8 +126,7 @@ Future<Boolean> stop() {
125126
// Clear old ping/pong requests
126127
this.outgoing.filter((msg) ->
127128
msg.isProtocol() &&
128-
(msg.protocolBab.equals(OP_PING_BYTES) || msg.protocolBab.equals(OP_PONG_BYTES)));
129-
129+
(msg.getProtocolBab().equals(OP_PING_BYTES) || msg.getProtocolBab().equals(OP_PONG_BYTES)));
130130
}
131131
finally {
132132
this.startStopLock.unlock();
@@ -162,9 +162,10 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
162162
}
163163
}
164164

165-
int blen = msg.protocolBab.length();
166-
System.arraycopy(msg.protocolBab.internalArray(), 0, sendBuffer, sendPosition, blen);
167-
sendPosition += blen;
165+
ByteArrayBuilder bab = msg.getProtocolBab();
166+
int babLen = bab.length();
167+
System.arraycopy(bab.internalArray(), 0, sendBuffer, sendPosition, babLen);
168+
sendPosition += babLen;
168169

169170
sendBuffer[sendPosition++] = CR;
170171
sendBuffer[sendPosition++] = LF;

0 commit comments

Comments
 (0)