Skip to content

Commit 9fb635d

Browse files
add message decorator
1 parent 832fc66 commit 9fb635d

File tree

1 file changed

+25
-7
lines changed

1 file changed

+25
-7
lines changed

lib/pubsub/subscription.js

+25-7
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ function Subscription(pubsub, options) {
127127
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
128128
this.closed = false;
129129
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
130-
this.inProgress = 0;
130+
this.inProgressAckIds = {};
131131
this.maxInProgress =
132132
util.is(options.maxInProgress, 'number') ? options.maxInProgress : Infinity;
133133
this.messageListeners = 0;
@@ -243,7 +243,7 @@ Subscription.prototype.startPulling_ = function() {
243243
var maxResults;
244244

245245
if (this.maxInProgress < Infinity) {
246-
maxResults = this.maxInProgress - this.inProgress;
246+
maxResults = this.maxInProgress - Object.keys(this.inProgressAckIds).length;
247247
}
248248

249249
this.pull({
@@ -284,16 +284,17 @@ Subscription.prototype.ack = function(ackIds, callback) {
284284
'At least one ID must be specified before it can be acknowledged.');
285285
}
286286

287+
ackIds = util.arrayize(ackIds);
288+
287289
var body = {
288-
ackIds: util.arrayize(ackIds)
290+
ackIds: ackIds
289291
};
290292

291293
callback = callback || util.noop;
292294

293295
var path = this.name + ':acknowledge';
294296

295297
this.makeReq_('POST', path, null, body, function() {
296-
self.inProgress--;
297298
self.refreshPausedStatus_();
298299
callback.apply(self, arguments);
299300
});
@@ -391,9 +392,10 @@ Subscription.prototype.pull = function(options, callback) {
391392
}
392393

393394
var messages = response.receivedMessages || [];
394-
messages = messages.map(Subscription.formatMessage_);
395+
messages = messages
396+
.map(Subscription.formatMessage_)
397+
.map(self.decorateMessage_.bind(self));
395398

396-
self.inProgress += messages.length;
397399
self.refreshPausedStatus_();
398400

399401
if (self.autoAck && messages.length !== 0) {
@@ -441,9 +443,25 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
441443
this.makeReq_('POST', path, null, body, callback);
442444
};
443445

446+
Subscription.prototype.decorateMessage_ = function(message) {
447+
var self = this;
448+
449+
this.inProgressAckIds[message.ackId] = true;
450+
451+
message.ack = self.ack.bind(self, message.ackId);
452+
453+
message.skip = function() {
454+
delete self.inProgressAckIds[message.ackId];
455+
self.refreshPausedStatus_();
456+
};
457+
458+
return message;
459+
};
460+
444461
Subscription.prototype.refreshPausedStatus_ = function() {
445462
var isCurrentlyPaused = this.paused;
446-
this.paused = this.inProgress >= this.maxInProgress;
463+
var inProgress = Object.keys(this.inProgressAckIds).length;
464+
this.paused = inProgress >= this.maxInProgress;
447465

448466
if (isCurrentlyPaused && !this.paused && this.messageListeners > 0) {
449467
this.startPulling_();

0 commit comments

Comments
 (0)