Skip to content

Commit b31f443

Browse files
messages are now passed to pull callback & listen for events
1 parent bc6a57e commit b31f443

File tree

3 files changed

+174
-183
lines changed

3 files changed

+174
-183
lines changed

lib/pubsub/subscription.js

+127-106
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ var util = require('../common/util.js');
9393
* //-
9494
* // Once you have obtained a subscription object, you may begin to register
9595
* // listeners. This will automatically trigger pulling for messages.
96-
* //
97-
* // This is a new paragraph.
9896
* //-
9997
*
10098
* // Register an error handler.
@@ -117,7 +115,7 @@ function Subscription(pubsub, options) {
117115
this.closed = false;
118116
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
119117

120-
this.once('newListener', this.startPulling_.bind(this));
118+
this.listenForEvents_();
121119
}
122120

123121
nodeutil.inherits(Subscription, events.EventEmitter);
@@ -136,6 +134,95 @@ Subscription.formatName_ = function(projectId, name) {
136134
return '/subscriptions/' + projectId + '/' + name;
137135
};
138136

137+
/**
138+
* Simplify a message from an API response to have two properties, `id` and
139+
* `data`. `data` is always converted to a string.
140+
*
141+
* @private
142+
*/
143+
Subscription.formatMessage_ = function(msg) {
144+
var message = {
145+
id: msg.ackId
146+
};
147+
if (msg.pubsubEvent && msg.pubsubEvent.message) {
148+
message.data =
149+
new Buffer(msg.pubsubEvent.message.data, 'base64').toString('utf-8');
150+
try {
151+
message.data = JSON.parse(message.data);
152+
} catch(e) {}
153+
}
154+
return message;
155+
};
156+
157+
/**
158+
* Begin listening for events on the subscription. This method keeps track of
159+
* how many message listeners are assigned, and then removed, making sure
160+
* polling is handled automatically.
161+
*
162+
* As long as there is one active message listener, the connection is open. As
163+
* soon as there are no more message listeners, the connection is closed.
164+
*
165+
* @private
166+
*
167+
* @example
168+
* this.listenForEvents_();
169+
*/
170+
Subscription.prototype.listenForEvents_ = function() {
171+
var that = this;
172+
var messageListeners = 0;
173+
174+
this.on('newListener', function(event) {
175+
if (event === 'message') {
176+
messageListeners++;
177+
if (that.closed) {
178+
that.closed = false;
179+
}
180+
that.startPulling_();
181+
}
182+
});
183+
184+
this.on('removeListener', function(event) {
185+
if (event === 'message' && --messageListeners === 0) {
186+
that.closed = true;
187+
}
188+
});
189+
};
190+
191+
/**
192+
* Poll the backend for new messages. This runs a loop to ping the API at the
193+
* provided interval from the subscription's instantiation. If one wasn't
194+
* provided, the default value is 10 milliseconds.
195+
*
196+
* If messages are received, they are emitted on the `message` event.
197+
*
198+
* Note: This method is automatically called once a message event handler is
199+
* assigned to the description.
200+
*
201+
* To stop pulling, see {@linkcode module:pubsub/subscription#close}.
202+
*
203+
* @private
204+
*
205+
* @example
206+
* subscription.startPulling_();
207+
*/
208+
Subscription.prototype.startPulling_ = function() {
209+
var that = this;
210+
if (this.closed) {
211+
return;
212+
}
213+
this.pull({
214+
returnImmediately: false
215+
}, function(err, message) {
216+
if (err) {
217+
that.emit('error', err);
218+
}
219+
if (message) {
220+
that.emit('message', message);
221+
}
222+
setTimeout(that.startPulling_.bind(that), that.interval);
223+
});
224+
};
225+
139226
/**
140227
* Acknowledge to the backend that the message was retrieved. You must provide
141228
* either a single ID, or an array of IDs.
@@ -162,15 +249,35 @@ Subscription.prototype.ack = function(ids, callback) {
162249
};
163250

164251
/**
165-
* Pull messages from the subscribed topic. If messages were found, they are
166-
* passed along with a `message` event.
252+
* Delete the subscription. Pull requests from the current subscription will be
253+
* errored once unsubscription is complete.
254+
*
255+
* @param {function=} callback - The callback function.
256+
*
257+
* @example
258+
* subscription.delete(function(err) {});
259+
*/
260+
Subscription.prototype.delete = function(callback) {
261+
callback = callback || util.noop;
262+
this.request(
263+
'DELETE', 'subscriptions/' + this.name, null, true, function(err) {
264+
if (err) {
265+
callback(err);
266+
return;
267+
}
268+
this.closed = true;
269+
callback(null);
270+
}.bind(this));
271+
};
272+
273+
/**
274+
* Pull messages from the subscribed topic. If messages were found, your
275+
* callback is executed with the message object.
167276
*
168277
* Note that messages are pulled automatically once you register your first
169278
* event listener to the subscription, thus the call to `pull` is handled for
170-
* you.
171-
*
172-
* Calling `pull` directly can be helpful after your subscription has been
173-
* closed with {@linkcode module:pubsub/subscription#close}.
279+
* you. If you don't want to start pulling, simply don't register a
280+
* `subscription.on('message', function() {})` event handler.
174281
*
175282
* @param {object=} options - Configuration object.
176283
* @param {boolean=} options.returnImmediately - If set, the system will respond
@@ -179,7 +286,10 @@ Subscription.prototype.ack = function(ids, callback) {
179286
* @param {function} callback - The callback function.
180287
*
181288
* @example
182-
* subscription.pull(function(err) {});
289+
* subscription.pull(function(err, message) {
290+
* // message.id = ID used to acknowledge its receival.
291+
* // message.data = Contents of the message.
292+
* });
183293
*/
184294
Subscription.prototype.pull = function(options, callback) {
185295
var that = this;
@@ -199,104 +309,15 @@ Subscription.prototype.pull = function(options, callback) {
199309
callback(err);
200310
return;
201311
}
202-
if (!that.autoAck) {
203-
that.emitMessage_(message);
204-
callback();
205-
return;
312+
message = Subscription.formatMessage_(message);
313+
if (that.autoAck) {
314+
that.ack(message.id, function(err) {
315+
callback(err, message);
316+
});
317+
} else {
318+
callback(null, message);
206319
}
207-
that.ack(message.ackId, function(err) {
208-
if (err) {
209-
callback(err);
210-
return;
211-
}
212-
that.emitMessage_(message);
213-
callback();
214-
});
215320
});
216321
};
217322

218-
/**
219-
* Delete the subscription. Pull requests from the current subscription will be
220-
* errored once unsubscription is complete.
221-
*
222-
* @param {function=} callback - The callback function.
223-
*
224-
* @example
225-
* subscription.delete(function(err) {});
226-
*/
227-
Subscription.prototype.delete = function(callback) {
228-
callback = callback || util.noop;
229-
this.request(
230-
'DELETE', 'subscriptions/' + this.name, null, true, function(err) {
231-
if (err) {
232-
callback(err);
233-
return;
234-
}
235-
this.closed = true;
236-
callback(null);
237-
}.bind(this));
238-
};
239-
240-
/**
241-
* Poll the backend for new messages. This runs a loop to ping the API at the
242-
* provided interval from the subscription's instantiation. If you didn't
243-
* provide one, the default value is 10 milliseconds.
244-
*
245-
* If messages are received, you can catch them by registering a listener for
246-
* the `message` event.
247-
*
248-
* To stop pulling, see {@linkcode module:pubsub/subscription#stopPulling}.
249-
*
250-
* @private
251-
*
252-
* @example
253-
* subscription.startPulling_();
254-
*/
255-
Subscription.prototype.startPulling_ = function() {
256-
if (this.closed) {
257-
return;
258-
}
259-
this.pull({
260-
returnImmediately: false
261-
}, function(err) {
262-
if (err && err.code === 400) {
263-
this.emit('error', err);
264-
}
265-
setTimeout(this.startPulling_.bind(this), this.interval);
266-
}.bind(this));
267-
};
268-
269-
/**
270-
* Stop the subscription from automatically pulling. You will still be able to
271-
* call {@linkcode module:pubsub/subscription#pull} directly.
272-
*
273-
* @example
274-
* subscription.close();
275-
*/
276-
Subscription.prototype.close = function() {
277-
this.closed = true;
278-
};
279-
280-
/**
281-
* Emits a 'message' event with the provided message.
282-
*
283-
* The message is simplified from the API response to have simply two
284-
* properties, `id` and `data`. `data` is always converted to a string.
285-
*
286-
* @private
287-
*/
288-
Subscription.prototype.emitMessage_ = function(msg) {
289-
var message = {
290-
id: msg.ackId
291-
};
292-
if (msg.pubsubEvent && msg.pubsubEvent.message) {
293-
message.data =
294-
new Buffer(msg.pubsubEvent.message.data, 'base64').toString('utf-8');
295-
try {
296-
message.data = JSON.parse(message.data);
297-
} catch(e) {}
298-
}
299-
this.emit('message', message);
300-
};
301-
302323
module.exports = Subscription;

regression/pubsub.js

+9-7
Original file line numberDiff line numberDiff line change
@@ -174,14 +174,16 @@ describe('pubsub', function() {
174174

175175
it('should be able to pull and ack', function(done) {
176176
var subscription = topic.subscription(subscriptions[0].name);
177-
subscription
178-
.once('message', function(msg) {
179-
subscription.ack(msg.id, done);
180-
})
181-
.pull({ returnImmediately: true }, assert.ifError);
182-
[1, 2, 3, 4, 5, 6].forEach(function() {
183-
topic.publish('hello', assert.ifError);
177+
subscription.pull({ returnImmediately: true }, function(err, msg) {
178+
assert.ifError(err);
179+
subscription.ack(msg.id, done);
184180
});
181+
topic.publish('hello', assert.ifError);
182+
topic.publish('hello', assert.ifError);
183+
topic.publish('hello', assert.ifError);
184+
topic.publish('hello', assert.ifError);
185+
topic.publish('hello', assert.ifError);
186+
topic.publish('hello', assert.ifError);
185187
});
186188
});
187189
});

0 commit comments

Comments
 (0)