32
32
import com .google .common .util .concurrent .Service ;
33
33
import com .google .pubsub .v1 .SubscriptionName ;
34
34
import io .grpc .ManagedChannelBuilder ;
35
- import java .util .concurrent .Executor ;
36
- import java .util .concurrent .TimeoutException ;
37
35
import io .grpc .Status ;
38
36
import io .grpc .StatusRuntimeException ;
39
37
import io .grpc .netty .GrpcSslContexts ;
43
41
import java .util .ArrayList ;
44
42
import java .util .List ;
45
43
import java .util .concurrent .CountDownLatch ;
44
+ import java .util .concurrent .Executor ;
46
45
import java .util .concurrent .ScheduledExecutorService ;
47
46
import java .util .concurrent .ScheduledFuture ;
48
47
import java .util .concurrent .TimeUnit ;
48
+ import java .util .concurrent .TimeoutException ;
49
49
import org .joda .time .Duration ;
50
50
import org .slf4j .Logger ;
51
51
import org .slf4j .LoggerFactory ;
88
88
* }
89
89
*
90
90
* Subscriber subscriber =
91
- * Subscriber.Builder. newBuilder(MY_SUBSCRIPTION, receiver)
91
+ * Subscriber.newBuilder(MY_SUBSCRIPTION, receiver)
92
92
* .setMaxBundleAcks(100)
93
93
* .build();
94
94
*
@@ -123,19 +123,33 @@ private Subscriber(Builder builder) throws IOException {
123
123
impl = new SubscriberImpl (builder );
124
124
}
125
125
126
+ /**
127
+ * Constructs a new {@link Builder}.
128
+ *
129
+ * <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
130
+ * Subscriber}.
131
+ *
132
+ * @param subscription Cloud Pub/Sub subscription to bind the subscriber to
133
+ * @param receiver an implementation of {@link MessageReceiver} used to process the received
134
+ * messages
135
+ */
136
+ public static Builder newBuilder (SubscriptionName subscription , MessageReceiver receiver ) {
137
+ return new Builder (subscription , receiver );
138
+ }
139
+
126
140
/** Subscription which the subscriber is subscribed to. */
127
- public String getSubscription () {
128
- return impl .getSubscription () ;
141
+ public SubscriptionName getSubscriptionName () {
142
+ return impl .subscriptionName ;
129
143
}
130
144
131
145
/** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
132
146
public Duration getAckExpirationPadding () {
133
- return impl .getAckExpirationPadding () ;
147
+ return impl .ackExpirationPadding ;
134
148
}
135
149
136
150
/** The flow control settings the Subscriber is configured with. */
137
151
public FlowController .Settings getFlowControlSettings () {
138
- return impl .getFlowControlSettings () ;
152
+ return impl .flowControlSettings ;
139
153
}
140
154
141
155
public void addListener (final SubscriberListener listener , Executor executor ) {
@@ -249,7 +263,8 @@ public void terminated(State from) {}
249
263
private static class SubscriberImpl extends AbstractService {
250
264
private static final Logger logger = LoggerFactory .getLogger (Subscriber .class );
251
265
252
- private final String subscription ;
266
+ private final SubscriptionName subscriptionName ;
267
+ private final String cachedSubscriptionNameString ;
253
268
private final FlowController .Settings flowControlSettings ;
254
269
private final Duration ackExpirationPadding ;
255
270
private final ScheduledExecutorService executor ;
@@ -270,7 +285,8 @@ private static class SubscriberImpl extends AbstractService {
270
285
private SubscriberImpl (Builder builder ) throws IOException {
271
286
receiver = builder .receiver ;
272
287
flowControlSettings = builder .flowControlSettings ;
273
- subscription = builder .subscription ;
288
+ subscriptionName = builder .subscriptionName ;
289
+ cachedSubscriptionNameString = subscriptionName .toString ();
274
290
ackExpirationPadding = builder .ackExpirationPadding ;
275
291
streamAckDeadlineSeconds =
276
292
Math .max (
@@ -340,7 +356,7 @@ private void startStreamingConnections() {
340
356
for (int i = 0 ; i < numChannels ; i ++) {
341
357
streamingSubscriberConnections .add (
342
358
new StreamingSubscriberConnection (
343
- subscription ,
359
+ cachedSubscriptionNameString ,
344
360
credentials ,
345
361
receiver ,
346
362
ackExpirationPadding ,
@@ -412,7 +428,7 @@ private void startPollingConnections() {
412
428
for (int i = 0 ; i < numChannels ; i ++) {
413
429
pollingSubscriberConnections .add (
414
430
new PollingSubscriberConnection (
415
- subscription ,
431
+ cachedSubscriptionNameString ,
416
432
credentials ,
417
433
receiver ,
418
434
ackExpirationPadding ,
@@ -496,21 +512,6 @@ public void run() {
496
512
throw new IllegalStateException (e );
497
513
}
498
514
}
499
-
500
- /** Subscription which the subscriber is subscribed to. */
501
- public String getSubscription () {
502
- return subscription ;
503
- }
504
-
505
- /** Acknowledgement expiration padding. See {@link Builder.setAckExpirationPadding}. */
506
- public Duration getAckExpirationPadding () {
507
- return ackExpirationPadding ;
508
- }
509
-
510
- /** The flow control settings the Subscriber is configured with. */
511
- public FlowController .Settings getFlowControlSettings () {
512
- return flowControlSettings ;
513
- }
514
515
}
515
516
516
517
/** Builder of {@link Subscriber Subscribers}. */
@@ -526,7 +527,7 @@ public static final class Builder {
526
527
* Runtime .getRuntime ().availableProcessors ())
527
528
.build ();
528
529
529
- String subscription ;
530
+ SubscriptionName subscriptionName ;
530
531
Optional <Credentials > credentials = Optional .absent ();
531
532
MessageReceiver receiver ;
532
533
@@ -539,22 +540,8 @@ public static final class Builder {
539
540
Optional .absent ();
540
541
Optional <Clock > clock = Optional .absent ();
541
542
542
- /**
543
- * Constructs a new {@link Builder}.
544
- *
545
- * <p>Once {@link #build()} is called a gRPC stub will be created for use of the {@link
546
- * Subscriber}.
547
- *
548
- * @param subscription Cloud Pub/Sub subscription to bind the subscriber to
549
- * @param receiver an implementation of {@link MessageReceiver} used to process the received
550
- * messages
551
- */
552
- public static Builder newBuilder (SubscriptionName subscription , MessageReceiver receiver ) {
553
- return new Builder (subscription .toString (), receiver );
554
- }
555
-
556
- Builder (String subscription , MessageReceiver receiver ) {
557
- this .subscription = subscription ;
543
+ Builder (SubscriptionName subscriptionName , MessageReceiver receiver ) {
544
+ this .subscriptionName = subscriptionName ;
558
545
this .receiver = receiver ;
559
546
}
560
547
0 commit comments