Skip to content

Commit f72a623

Browse files
committed
Refactor PullOption
- Make maxMessages a method parameter rather than an optional option - Move MessageConsumer.PullOption to PubSub - Remove MessageConsumer.start/stop methods in favor of close()
1 parent 57d613b commit f72a623

File tree

4 files changed

+34
-63
lines changed

4 files changed

+34
-63
lines changed

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java

+22-42
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,14 @@ enum OptionType implements Option.OptionType {
4747
<T> T get(Map<Option.OptionType, ?> options) {
4848
return (T) options.get(this);
4949
}
50+
51+
String getString(Map<Option.OptionType, ?> options) {
52+
return get(options);
53+
}
54+
55+
Integer getInteger(Map<Option.OptionType, ?> options) {
56+
return get(options);
57+
}
5058
}
5159

5260
private ListOption(OptionType option, Object value) {
@@ -73,27 +81,31 @@ public static ListOption pageToken(String pageToken) {
7381
*/
7482
final class PullOption extends Option {
7583

76-
private static final long serialVersionUID = -5220474819637439937L;
84+
private static final long serialVersionUID = 4792164134340316582L;
7785

7886
enum OptionType implements Option.OptionType {
79-
MAX_MESSAGES;
87+
MAX_CONCURRENT_CALLBACKS;
8088

8189
@SuppressWarnings("unchecked")
8290
<T> T get(Map<Option.OptionType, ?> options) {
8391
return (T) options.get(this);
8492
}
93+
94+
Integer getInteger(Map<Option.OptionType, ?> options) {
95+
return get(options);
96+
}
8597
}
8698

87-
private PullOption(OptionType option, Object value) {
99+
private PullOption(Option.OptionType option, Object value) {
88100
super(option, value);
89101
}
90102

91103
/**
92-
* Returns an option to specify the maximum number of messages that can be returned by the pull
93-
* operation.
104+
* Returns an option to specify the maximum number of messages that can be executed
105+
* concurrently at any time.
94106
*/
95-
public static PullOption maxMessages(int maxMessages) {
96-
return new PullOption(OptionType.MAX_MESSAGES, maxMessages);
107+
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
108+
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
97109
}
98110
}
99111

@@ -110,38 +122,6 @@ interface MessageProcessor {
110122
*/
111123
interface MessageConsumer extends AutoCloseable {
112124

113-
/**
114-
* Class for specifying options to pull messages through a {@code MessageConsumer}.
115-
*/
116-
final class PullOption extends Option {
117-
118-
private static final long serialVersionUID = 4792164134340316582L;
119-
120-
enum OptionType implements Option.OptionType {
121-
MAX_CONCURRENT_CALLBACKS;
122-
123-
@SuppressWarnings("unchecked")
124-
<T> T get(Map<OptionType, ?> options) {
125-
return (T) options.get(this);
126-
}
127-
}
128-
129-
private PullOption(OptionType option, Object value) {
130-
super(option, value);
131-
}
132-
133-
/**
134-
* Returns an option to specify the maximum number of messages that can be executed
135-
* concurrently at any time.
136-
*/
137-
public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
138-
return new PullOption(OptionType.MAX_CONCURRENT_CALLBACKS, maxConcurrency);
139-
}
140-
}
141-
142-
void start(MessageConsumer.PullOption... options);
143-
144-
void stop();
145125
}
146126

147127
Topic create(TopicInfo topic);
@@ -200,11 +180,11 @@ public static PullOption maxConcurrentCallbacks(int maxConcurrency) {
200180

201181
Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic, ListOption... options);
202182

203-
Iterator<ReceivedMessage> pull(String subscription, PullOption... options);
183+
Iterator<ReceivedMessage> pull(String subscription, int maxMessages);
204184

205-
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options);
185+
Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages);
206186

207-
MessageConsumer pullAsync(String subscription, MessageProcessor callback);
187+
MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options);
208188

209189
void ack(String subscription, String ackId, String... ackIds);
210190

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -196,13 +196,13 @@ public Future<AsyncPage<SubscriptionId>> listSubscriptionsAsync(String topic,
196196
}
197197

198198
@Override
199-
public Iterator<ReceivedMessage> pull(String subscription, PullOption... options) {
199+
public Iterator<ReceivedMessage> pull(String subscription, int maxMessages) {
200200
// this should set return_immediately to true
201201
return null;
202202
}
203203

204204
@Override
205-
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOption... options) {
205+
public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, int maxMessages) {
206206
// though this method can set return_immediately to false (as future can be canceled) I
207207
// suggest to keep it false so sync could delegate to asyc and use the same options
208208
// this method also should use the VTKIT thread-pool to renew ack deadline for non consumed
@@ -211,7 +211,8 @@ public Future<Iterator<ReceivedMessage>> pullAsync(String subscription, PullOpti
211211
}
212212

213213
@Override
214-
public MessageConsumer pullAsync(String subscription, MessageProcessor callback) {
214+
public MessageConsumer pullAsync(String subscription, MessageProcessor callback,
215+
PullOption... options) {
215216
// this method should use the VTKIT thread-pool (maybe getting it should be part of the spi)
216217
return null;
217218
}

gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Subscription.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -151,16 +151,16 @@ public Future<Void> replacePushConfigAsync(PushConfig pushConfig) {
151151
return pubsub.replacePushConfigAsync(name(), pushConfig);
152152
}
153153

154-
public Iterator<ReceivedMessage> pull(PullOption... options) {
155-
return pubsub.pull(name(), options);
154+
public Iterator<ReceivedMessage> pull(int maxMessages) {
155+
return pubsub.pull(name(), maxMessages);
156156
}
157157

158-
public Future<Iterator<ReceivedMessage>> pullAsync(PullOption... options) {
159-
return pubsub.pullAsync(name(), options);
158+
public Future<Iterator<ReceivedMessage>> pullAsync(int maxMessages) {
159+
return pubsub.pullAsync(name(), maxMessages);
160160
}
161161

162-
public MessageConsumer pullAsync(MessageProcessor callback) {
163-
return pubsub.pullAsync(name(), callback);
162+
public MessageConsumer pullAsync(MessageProcessor callback, PullOption... options) {
163+
return pubsub.pullAsync(name(), callback, options);
164164
}
165165

166166
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {

gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubTest.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ public class PubSubTest {
2828

2929
private static final int PAGE_SIZE = 42;
3030
private static final String PAGE_TOKEN = "page token";
31-
private static final int MAX_MESSAGES = 42;
3231
private static final int MAX_CONCURRENT_CALLBACKS = 42;
3332

3433
@Test
@@ -45,17 +44,8 @@ public void testListOption() {
4544

4645
@Test
4746
public void testPullOptions() {
48-
PullOption pullOption = PullOption.maxMessages(MAX_MESSAGES);
49-
assertEquals(MAX_MESSAGES, pullOption.value());
50-
assertEquals(PullOption.OptionType.MAX_MESSAGES, pullOption.optionType());
51-
}
52-
53-
@Test
54-
public void testMessageConsumerPullOptions() {
55-
MessageConsumer.PullOption pullOption =
56-
MessageConsumer.PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS);
47+
PullOption pullOption = PullOption.maxConcurrentCallbacks(MAX_CONCURRENT_CALLBACKS);
5748
assertEquals(MAX_CONCURRENT_CALLBACKS, pullOption.value());
58-
assertEquals(MessageConsumer.PullOption.OptionType.MAX_CONCURRENT_CALLBACKS,
59-
pullOption.optionType());
49+
assertEquals(PullOption.OptionType.MAX_CONCURRENT_CALLBACKS, pullOption.optionType());
6050
}
6151
}

0 commit comments

Comments
 (0)