Skip to content

Commit 366ae9f

Browse files
authored
Reader Listener (#1265)
* Reader Listener A way to debug incoming messages before they are processed * Reader Listener A way to debug incoming messages before they are processed * Reader Listener A way to debug incoming messages before they are processed * update copyright * added benchmark * added benchmark * updated benchmark
1 parent e18c04e commit 366ae9f

File tree

5 files changed

+85
-3
lines changed

5 files changed

+85
-3
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,19 @@
1717
### Tests
1818
* Unit test coverage #1252 @scottf
1919

20+
### Benchmark
21+
22+
```
23+
┌─────────────────────┬───────────────────┬─────────────────┬──────────────────────────┬──────────────────┐
24+
│ │ count │ time │ msgs/sec │ bytes/sec │
25+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
26+
│ PubAsync │ 50,000,000 msgs │ 28:37.522 │ 29,111.709 msgs/sec │ 6.94 mb/sec │
27+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
28+
│ SubFetch │ 50,000,000 msgs │ 35:23.774 │ 23,542.995 msgs/sec │ 5.61 mb/sec │
29+
├─────────────────────┼───────────────────┼─────────────────┼──────────────────────────┼──────────────────┤
30+
│ SubIterate │ 50,000,000 msgs │ 17:10.329 │ 48,528.189 msgs/sec │ 11.57 mb/sec │
31+
└─────────────────────┴───────────────────┴─────────────────┴──────────────────────────┴──────────────────┘
32+
```
2033

2134
## 2.20.4
2235

src/main/java/io/nats/client/Options.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,11 @@ public class Options {
517517
* {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory}.
518518
*/
519519
public static final String PROP_CALLBACK_THREAD_FACTORY_CLASS = "callback.thread.factory.class";
520+
/**
521+
* Property used to set class name for the ReaderListener implementation
522+
* {@link Builder#readListener(ReadListener) readListener}.
523+
*/
524+
public static final String PROP_READ_LISTENER_CLASS = "read.listener.class";
520525

521526
// ----------------------------------------------------------------------------------------------------
522527
// PROTOCOL CONNECT OPTION CONSTANTS
@@ -658,6 +663,7 @@ public class Options {
658663
private final ErrorListener errorListener;
659664
private final TimeTraceLogger timeTraceLogger;
660665
private final ConnectionListener connectionListener;
666+
private ReadListener readListener;
661667
private final StatisticsCollector statisticsCollector;
662668
private final String dataPortType;
663669

@@ -779,6 +785,7 @@ public static class Builder {
779785
private ErrorListener errorListener = null;
780786
private TimeTraceLogger timeTraceLogger = null;
781787
private ConnectionListener connectionListener = null;
788+
private ReadListener readListener = null;
782789
private StatisticsCollector statisticsCollector = null;
783790
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
784791
private ExecutorService executor;
@@ -896,6 +903,7 @@ public Builder properties(Properties props) {
896903
classnameProperty(props, PROP_ERROR_LISTENER, o -> this.errorListener = (ErrorListener) o);
897904
classnameProperty(props, PROP_TIME_TRACE_LOGGER, o -> this.timeTraceLogger = (TimeTraceLogger) o);
898905
classnameProperty(props, PROP_CONNECTION_CB, o -> this.connectionListener = (ConnectionListener) o);
906+
classnameProperty(props, PROP_READ_LISTENER_CLASS, o -> this.readListener = (ReadListener) o);
899907
classnameProperty(props, PROP_STATISTICS_COLLECTOR, o -> this.statisticsCollector = (StatisticsCollector) o);
900908

901909
stringProperty(props, PROP_DATA_PORT_TYPE, s -> this.dataPortType = s);
@@ -914,7 +922,6 @@ public Builder properties(Properties props) {
914922
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
915923
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
916924
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
917-
918925
return this;
919926
}
920927

@@ -1551,6 +1558,17 @@ public Builder connectionListener(ConnectionListener listener) {
15511558
return this;
15521559
}
15531560

1561+
/**
1562+
* Sets a listener to be notified on incoming protocol/message
1563+
*
1564+
* @param readListener the listener
1565+
* @return the Builder for chaining
1566+
*/
1567+
public Builder readListener(ReadListener readListener) {
1568+
this.readListener = readListener;
1569+
return this;
1570+
}
1571+
15541572
/**
15551573
* Set the {@link StatisticsCollector StatisticsCollector} to collect connection metrics.
15561574
* <p>
@@ -1960,6 +1978,7 @@ public Builder(Options o) {
19601978
this.errorListener = o.errorListener;
19611979
this.timeTraceLogger = o.timeTraceLogger;
19621980
this.connectionListener = o.connectionListener;
1981+
this.readListener = o.readListener;
19631982
this.statisticsCollector = o.statisticsCollector;
19641983
this.dataPortType = o.dataPortType;
19651984
this.trackAdvancedStats = o.trackAdvancedStats;
@@ -2027,6 +2046,7 @@ private Options(Builder b) {
20272046
this.errorListener = b.errorListener;
20282047
this.timeTraceLogger = b.timeTraceLogger;
20292048
this.connectionListener = b.connectionListener;
2049+
this.readListener = b.readListener;
20302050
this.statisticsCollector = b.statisticsCollector;
20312051
this.dataPortType = b.dataPortType;
20322052
this.trackAdvancedStats = b.trackAdvancedStats;
@@ -2112,6 +2132,13 @@ public ConnectionListener getConnectionListener() {
21122132
return this.connectionListener;
21132133
}
21142134

2135+
/**
2136+
* @return the read listener, or null, see {@link Builder#readListener(ReadListener) readListener()} in the builder doc
2137+
*/
2138+
public ReadListener getReadListener() {
2139+
return this.readListener;
2140+
}
2141+
21152142
/**
21162143
* @return the statistics collector, or null, see {@link Builder#statisticsCollector(StatisticsCollector) statisticsCollector()} in the builder doc
21172144
*/
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
// Copyright 2024 The NATS Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at:
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package io.nats.client;
15+
16+
public interface ReadListener {
17+
void protocol(String op, String string);
18+
void message(String op, Message message);
19+
}

src/main/java/io/nats/client/impl/NatsConnectionReader.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.nats.client.impl;
1515

16+
import io.nats.client.ReadListener;
1617
import io.nats.client.support.IncomingHeadersProcessor;
1718

1819
import java.io.IOException;
@@ -68,6 +69,7 @@ enum Mode {
6869
private final AtomicBoolean running;
6970

7071
private final boolean utf8Mode;
72+
private final ReadListener readListener;
7173

7274
NatsConnectionReader(NatsConnection connection) {
7375
this.connection = connection;
@@ -83,6 +85,7 @@ enum Mode {
8385
this.bufferPosition = 0;
8486

8587
this.utf8Mode = connection.getOptions().supportUTF8Subjects();
88+
readListener = connection.getOptions().getReadListener();
8689
}
8790

8891
// Should only be called if the current thread has exited.
@@ -346,7 +349,11 @@ void gatherMessageData(int maxPos) throws IOException {
346349
if (gotCR) {
347350
if (b == LF) {
348351
incoming.setData(msgData);
349-
this.connection.deliverMessage(incoming.getMessage());
352+
NatsMessage m = incoming.getMessage();
353+
this.connection.deliverMessage(m);
354+
if (readListener != null) {
355+
readListener.message(op, m);
356+
}
350357
msgData = null;
351358
msgDataPosition = 0;
352359
incoming = null;
@@ -544,34 +551,50 @@ void parseProtocolMessage() throws IOException {
544551
break;
545552
case OP_OK:
546553
this.connection.processOK();
554+
if (readListener != null) {
555+
readListener.protocol(op, null);
556+
}
547557
this.op = UNKNOWN_OP;
548558
this.mode = Mode.GATHER_OP;
549559
break;
550560
case OP_ERR:
551561
String errorText = StandardCharsets.UTF_8.decode(protocolBuffer).toString().replace("'", "");
552562
this.connection.processError(errorText);
563+
if (readListener != null) {
564+
readListener.protocol(op, errorText);
565+
}
553566
this.op = UNKNOWN_OP;
554567
this.mode = Mode.GATHER_OP;
555568
break;
556569
case OP_PING:
557570
this.connection.sendPong();
571+
if (readListener != null) {
572+
readListener.protocol(op, null);
573+
}
558574
this.op = UNKNOWN_OP;
559575
this.mode = Mode.GATHER_OP;
560576
break;
561577
case OP_PONG:
562578
this.connection.handlePong();
579+
if (readListener != null) {
580+
readListener.protocol(op, null);
581+
}
563582
this.op = UNKNOWN_OP;
564583
this.mode = Mode.GATHER_OP;
565584
break;
566585
case OP_INFO:
567586
String info = StandardCharsets.UTF_8.decode(protocolBuffer).toString();
568587
this.connection.handleInfo(info);
588+
if (readListener != null) {
589+
readListener.protocol(op, info);
590+
}
569591
this.op = UNKNOWN_OP;
570592
this.mode = Mode.GATHER_OP;
571593
break;
572594
default:
573595
throw new IllegalStateException("Unknown protocol operation "+op);
574596
}
597+
575598
} catch (IllegalStateException | NumberFormatException | NullPointerException ex) {
576599
this.encounteredProtocolError(ex);
577600
}

src/main/java/io/nats/client/impl/NatsConnectionWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ void sendMessageBatch(NatsMessage msg, DataPort dataPort, StatisticsCollector st
151151
sendBuffer[sendPosition++] = CR;
152152
sendBuffer[sendPosition++] = LF;
153153

154-
if (!msg.isProtocol()) {
154+
if (!msg.isProtocol()) { // because a protocol message does not have headers
155155
sendPosition += msg.copyNotEmptyHeaders(sendPosition, sendBuffer);
156156

157157
byte[] bytes = msg.getData(); // guaranteed to not be null

0 commit comments

Comments
 (0)