Skip to content

Commit 7287901

Browse files
authored
Jetstream Usage (#1193)
1 parent c0dcf12 commit 7287901

File tree

1 file changed

+111
-2
lines changed

1 file changed

+111
-2
lines changed

src/main/java/io/nats/client/JetStream.java

Lines changed: 111 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,101 @@
1313

1414
package io.nats.client;
1515

16+
import io.nats.client.api.ConsumerConfiguration;
1617
import io.nats.client.api.PublishAck;
18+
import io.nats.client.api.StorageType;
19+
import io.nats.client.api.StreamConfiguration;
1720
import io.nats.client.impl.Headers;
1821

1922
import java.io.IOException;
2023
import java.time.Duration;
2124
import java.util.concurrent.CompletableFuture;
2225

2326
/**
24-
* JetStream context for creation and access to streams and consumers in NATS.
27+
* JetStream context for access to streams and consumers in NATS.
28+
*
29+
* <h3>Basic usage</h3>
30+
*
31+
* <p>{@link #publish(String, byte[]) JetStream.Publish} will send a message on the specified subject, waiting for acknowledgement.
32+
* A <b>503 No responders</b> error will be received if no stream is listening on said subject.
33+
*
34+
* <p>{@link #publishAsync(String, byte[]) PublishAsync} will not wait for acknowledgement but return a {@link CompletableFuture CompletableFuture},
35+
* which can be checked for acknowledgement at a later point.
36+
*
37+
* <p> Use {@link #getStreamContext(String ) getStreamContext(String)} to access a simplified API for <b>consuming/subscribing</b> messages from Jetstream.
38+
* It is <b>recommened</b> to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement}
39+
*
40+
* <p>{@link #subscribe(String)} is a convenience method for implicitly creating a consumer on a stream and receiving messages. This method should be used for ephemeral (not durable) conusmers.
41+
* It can create a named durable consumers though Options, but we prefer to avoid creating durable consumers implictly.
42+
* It is <b>recommened</b> to manage consumers explicitely through {@link StreamContext StreamContext} or {@link JetStreamManagement JetStreamManagement}
43+
*
44+
* {@link ConsumerContext ConsumerContext} based subscription.
45+
*
46+
* <h3>Recommended usage for creating streams, consumers, publish and listen on a stream</h3>
47+
* <pre>
48+
* io.nats.client.Connection nc = Nats.connect();
49+
*
50+
* //Setting up a stream and a consumer
51+
* JetStreamManagement jsm = nc.jetStreamManagement();
52+
* StreamConfiguration sc = StreamConfiguration.builder()
53+
* .name("my-stream")
54+
* .storageType(StorageType.File)
55+
* .subjects("foo.*", "bar.*")
56+
* .build();
57+
*
58+
* jsm.addStream(sc);
59+
*
60+
* ConsumerConfiguration consumerConfig = ConsumerConfiguration.builder()
61+
* .durable("my-consumer")
62+
* .build();
63+
*
64+
* jsm.createConsumer("my-stream", consumerConfig);
65+
*
66+
* //Listening and publishing
67+
* io.nats.client.JetStream js = nc.jetStream();
68+
* ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
69+
* MessageConsumer mc = consumerContext.consume(
70+
* msg -&gt; {
71+
* System.out.println(" Received " + msg.getSubject());
72+
* msg.ack();
73+
* });
74+
*
75+
* js.publish("foo.joe", "Hello World".getBytes());
76+
*
77+
* //Wait a moment, then stop the MessageConsumer
78+
* Thread.sleep(3000);
79+
* mc.stop();
80+
*
81+
* </pre>
82+
*
83+
* <h3>Recommended usage of asynchronous publishing</h3>
84+
*
85+
* Jetstream messages can be published asynchronously, returning a CompletableFuture.
86+
* Note that you need to check the Future eventually otherwise the delivery guarantee is the same a regular {@link Connection#publish(String, byte[]) Connection.Publish}
87+
*
88+
* <p>We are publishing a batch of 100 messages and check for completion afterwards.
89+
*
90+
* <pre>
91+
* int COUNT = 100;
92+
* java.util.concurrent.CompletableFuture&lt;?&gt;[] acks = new java.util.concurrent.CompletableFuture&lt;?&gt;[COUNT];
93+
*
94+
* for( int i=0; i&lt;COUNT; i++ ) {
95+
* acks[i] = js.publishAsync("foo.joe", ("Hello "+i).getBytes());
96+
* }
97+
*
98+
* //Acknowledgments may arrive out of sequence, but CompletableFuture is handling this for us.
99+
* for( int i=0; i&lt;COUNT; i++ ) {
100+
* try {
101+
* acks[i].get();
102+
* } catch ( Exception e ) {
103+
* //Retry or handle error
104+
* }
105+
* }
106+
*
107+
* //Now we may send anther batch
108+
*
109+
* </pre>
110+
*
25111
*/
26112
public interface JetStream {
27113

@@ -458,7 +544,8 @@ public interface JetStream {
458544
JetStreamSubscription subscribe(String subscribeSubject, PullSubscribeOptions options) throws IOException, JetStreamApiException;
459545

460546
/**
461-
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options
547+
* Create an asynchronous subscription to the specified subject in the mode of pull, with additional options.
548+
*
462549
* @param subscribeSubject The subject to subscribe to
463550
* Can be null or empty when the options have a ConsumerConfiguration that supplies a filter subject.
464551
* @param dispatcher The dispatcher to handle this subscription
@@ -473,6 +560,25 @@ public interface JetStream {
473560

474561
/**
475562
* Get a stream context for a specific named stream. Verifies that the stream exists.
563+
*
564+
* <p><b>Recommended usage:</b> {@link StreamContext StreamContext} and {@link ConsumerContext ConsumerContext} are the preferred way to interact with existing streams and consume from streams.
565+
* {@link JetStreamManagement JetStreamManagement} should be used to create streams and consumers. {@link ConsumerContext#consume ConsumerContext.consume()} supports both push and pull consumers transparently.
566+
*
567+
* <pre>
568+
* nc = Nats.connect();
569+
* Jetstream js = nc.jetStream();
570+
* StreamContext streamContext = js.getStreamContext("my-stream");
571+
* ConsumerContext consumerContext = streamContext.getConsumerContext("my-consumer");
572+
* // Or
573+
* // ConsumerContext consumerContext = js.getConsumerContext("my-stream", "my-consumer");
574+
* consumerContext.consume(
575+
* msg -&gt; {
576+
* System.out.println(" Received " + msg.getSubject());
577+
* msg.ack();
578+
* });
579+
* </pre>
580+
*
581+
*
476582
* @param streamName the name of the stream
477583
* @return a StreamContext object
478584
* @throws IOException covers various communication issues with the NATS
@@ -483,6 +589,9 @@ public interface JetStream {
483589

484590
/**
485591
* Get a consumer context for a specific named stream and specific named consumer.
592+
*
593+
* <p><b>Recommended usage:</b> See {@link #getStreamContext(String) getStreamContext(String)}
594+
*
486595
* Verifies that the stream and consumer exist.
487596
* @param streamName the name of the stream
488597
* @param consumerName the name of the consumer

0 commit comments

Comments
 (0)