Skip to content

Commit 0550871

Browse files
authored
rename MessagesProcessor to MessageDispatcher (#1548)
* rename MessagesProcessor to MessageDispatcher Fixes #1546
1 parent 25ffa22 commit 0550871

File tree

4 files changed

+30
-30
lines changed

4 files changed

+30
-30
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessagesProcessor.java renamed to google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/MessageDispatcher.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@
5151
* Dispatches messages to a message receiver while handling the messages acking and lease
5252
* extensions.
5353
*/
54-
class MessagesProcessor {
55-
private static final Logger logger = LoggerFactory.getLogger(MessagesProcessor.class);
54+
class MessageDispatcher {
55+
private static final Logger logger = LoggerFactory.getLogger(MessageDispatcher.class);
5656

5757
private static final int INITIAL_ACK_DEADLINE_EXTENSION_SECONDS = 2;
5858
@VisibleForTesting static final Duration PENDING_ACKS_SEND_DELAY = Duration.millis(100);
@@ -63,7 +63,7 @@ class MessagesProcessor {
6363

6464
private final Duration ackExpirationPadding;
6565
private final MessageReceiver receiver;
66-
private final AcksProcessor acksProcessor;
66+
private final AckProcessor ackProcessor;
6767

6868
private final FlowController flowController;
6969
private final MessagesWaiter messagesWaiter;
@@ -199,14 +199,14 @@ public void onSuccess(AckReply reply) {
199199
}
200200
}
201201

202-
public interface AcksProcessor {
202+
public interface AckProcessor {
203203
void sendAckOperations(
204204
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions);
205205
}
206206

207-
MessagesProcessor(
207+
MessageDispatcher(
208208
MessageReceiver receiver,
209-
AcksProcessor acksProcessor,
209+
AckProcessor ackProcessor,
210210
Duration ackExpirationPadding,
211211
Distribution ackLatencyDistribution,
212212
FlowController flowController,
@@ -215,7 +215,7 @@ void sendAckOperations(
215215
this.executor = executor;
216216
this.ackExpirationPadding = ackExpirationPadding;
217217
this.receiver = receiver;
218-
this.acksProcessor = acksProcessor;
218+
this.ackProcessor = ackProcessor;
219219
this.flowController = flowController;
220220
outstandingAckHandlers = new HashMap<>();
221221
pendingAcks = new HashSet<>();
@@ -466,6 +466,6 @@ private void processOutstandingAckOperations(
466466
}
467467
}
468468

469-
acksProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
469+
ackProcessor.sendAckOperations(acksToSend, modifyAckDeadlinesToSend);
470470
}
471471
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PollingSubscriberConnection.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import com.google.api.stats.Distribution;
2323
import com.google.auth.Credentials;
2424
import com.google.cloud.Clock;
25-
import com.google.cloud.pubsub.spi.v1.MessagesProcessor.AcksProcessor;
26-
import com.google.cloud.pubsub.spi.v1.MessagesProcessor.PendingModifyAckDeadline;
25+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
26+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
2727
import com.google.common.collect.Lists;
2828
import com.google.common.util.concurrent.AbstractService;
2929
import com.google.common.util.concurrent.FutureCallback;
@@ -51,7 +51,7 @@
5151
* Implementation of {@link AbstractSubscriberConnection} based on Cloud Pub/Sub pull and
5252
* acknowledge operations.
5353
*/
54-
final class PollingSubscriberConnection extends AbstractService implements AcksProcessor {
54+
final class PollingSubscriberConnection extends AbstractService implements AckProcessor {
5555
private static final int MAX_PER_REQUEST_CHANGES = 1000;
5656
private static final Duration DEFAULT_TIMEOUT = Duration.standardSeconds(10);
5757
private static final int DEFAULT_MAX_MESSAGES = 1000;
@@ -63,7 +63,7 @@ final class PollingSubscriberConnection extends AbstractService implements AcksP
6363
private final String subscription;
6464
private final ScheduledExecutorService executor;
6565
private final SubscriberFutureStub stub;
66-
private final MessagesProcessor messagesProcessor;
66+
private final MessageDispatcher messageDispatcher;
6767

6868
public PollingSubscriberConnection(
6969
String subscription,
@@ -80,8 +80,8 @@ public PollingSubscriberConnection(
8080
stub =
8181
SubscriberGrpc.newFutureStub(channel)
8282
.withCallCredentials(MoreCallCredentials.from(credentials));
83-
messagesProcessor =
84-
new MessagesProcessor(
83+
messageDispatcher =
84+
new MessageDispatcher(
8585
receiver,
8686
this,
8787
ackExpirationPadding,
@@ -109,7 +109,7 @@ private void initialize() {
109109
new FutureCallback<Subscription>() {
110110
@Override
111111
public void onSuccess(Subscription result) {
112-
messagesProcessor.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
112+
messageDispatcher.setMessageDeadlineSeconds(result.getAckDeadlineSeconds());
113113
pullMessages(INITIAL_BACKOFF);
114114
}
115115

@@ -122,7 +122,7 @@ public void onFailure(Throwable cause) {
122122

123123
@Override
124124
protected void doStop() {
125-
messagesProcessor.stop();
125+
messageDispatcher.stop();
126126
notifyStopped();
127127
}
128128

@@ -141,7 +141,7 @@ private void pullMessages(final Duration backoff) {
141141
new FutureCallback<PullResponse>() {
142142
@Override
143143
public void onSuccess(PullResponse pullResponse) {
144-
messagesProcessor.processReceivedMessages(pullResponse.getReceivedMessagesList());
144+
messageDispatcher.processReceivedMessages(pullResponse.getReceivedMessagesList());
145145
if (pullResponse.getReceivedMessagesCount() == 0) {
146146
// No messages in response, possibly caught up in backlog, we backoff to avoid
147147
// slamming the server.

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/StreamingSubscriberConnection.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import com.google.api.stats.Distribution;
2323
import com.google.auth.Credentials;
2424
import com.google.cloud.Clock;
25-
import com.google.cloud.pubsub.spi.v1.MessagesProcessor.AcksProcessor;
26-
import com.google.cloud.pubsub.spi.v1.MessagesProcessor.PendingModifyAckDeadline;
25+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.AckProcessor;
26+
import com.google.cloud.pubsub.spi.v1.MessageDispatcher.PendingModifyAckDeadline;
2727
import com.google.common.collect.Lists;
2828
import com.google.common.util.concurrent.AbstractService;
2929
import com.google.common.util.concurrent.FutureCallback;
@@ -49,7 +49,7 @@
4949
import org.slf4j.LoggerFactory;
5050

5151
/** Implementation of {@link AbstractSubscriberConnection} based on Cloud Pub/Sub streaming pull. */
52-
final class StreamingSubscriberConnection extends AbstractService implements AcksProcessor {
52+
final class StreamingSubscriberConnection extends AbstractService implements AckProcessor {
5353
private static final Logger logger = LoggerFactory.getLogger(StreamingSubscriberConnection.class);
5454

5555
private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = new Duration(100); // 100ms
@@ -62,7 +62,7 @@ final class StreamingSubscriberConnection extends AbstractService implements Ack
6262

6363
private final String subscription;
6464
private final ScheduledExecutorService executor;
65-
private final MessagesProcessor messagesProcessor;
65+
private final MessageDispatcher messageDispatcher;
6666
private ClientCallStreamObserver<StreamingPullRequest> requestObserver;
6767

6868
public StreamingSubscriberConnection(
@@ -80,16 +80,16 @@ public StreamingSubscriberConnection(
8080
this.executor = executor;
8181
this.credentials = credentials;
8282
this.channel = channel;
83-
this.messagesProcessor =
84-
new MessagesProcessor(
83+
this.messageDispatcher =
84+
new MessageDispatcher(
8585
receiver,
8686
this,
8787
ackExpirationPadding,
8888
ackLatencyDistribution,
8989
flowController,
9090
executor,
9191
clock);
92-
messagesProcessor.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
92+
messageDispatcher.setMessageDeadlineSeconds(streamAckDeadlineSeconds);
9393
}
9494

9595
@Override
@@ -101,7 +101,7 @@ protected void doStart() {
101101

102102
@Override
103103
protected void doStop() {
104-
messagesProcessor.stop();
104+
messageDispatcher.stop();
105105
notifyStopped();
106106
requestObserver.onError(Status.CANCELLED.asException());
107107
}
@@ -123,7 +123,7 @@ public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestOb
123123

124124
@Override
125125
public void onNext(StreamingPullResponse response) {
126-
messagesProcessor.processReceivedMessages(response.getReceivedMessagesList());
126+
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
127127
// Only if not shutdown we will request one more bundles of messages to be delivered.
128128
if (isAlive()) {
129129
requestObserver.request(1);
@@ -157,11 +157,11 @@ private void initialize() {
157157
logger.debug(
158158
"Initializing stream to subscription {} with deadline {}",
159159
subscription,
160-
messagesProcessor.getMessageDeadlineSeconds());
160+
messageDispatcher.getMessageDeadlineSeconds());
161161
requestObserver.onNext(
162162
StreamingPullRequest.newBuilder()
163163
.setSubscription(subscription)
164-
.setStreamAckDeadlineSeconds(messagesProcessor.getMessageDeadlineSeconds())
164+
.setStreamAckDeadlineSeconds(messageDispatcher.getMessageDeadlineSeconds())
165165
.build());
166166
requestObserver.request(1);
167167

@@ -240,7 +240,7 @@ public void sendAckOperations(
240240
}
241241

242242
public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
243-
messagesProcessor.setMessageDeadlineSeconds(newAckDeadlineSeconds);
243+
messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
244244
requestObserver.onNext(
245245
StreamingPullRequest.newBuilder()
246246
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package com.google.cloud.pubsub.spi.v1;
1818

19-
import static com.google.cloud.pubsub.spi.v1.MessagesProcessor.PENDING_ACKS_SEND_DELAY;
19+
import static com.google.cloud.pubsub.spi.v1.MessageDispatcher.PENDING_ACKS_SEND_DELAY;
2020
import static org.junit.Assert.assertEquals;
2121

2222
import com.google.cloud.pubsub.spi.v1.FakeSubscriberServiceImpl.ModifyAckDeadline;

0 commit comments

Comments
 (0)