Skip to content

Commit 4790f06

Browse files
authored
pubsub: implement streaming pull receipt (#2732)
1 parent e2a46d2 commit 4790f06

File tree

3 files changed

+429
-450
lines changed

3 files changed

+429
-450
lines changed

packages/pubsub/src/subscription.js

+128-109
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
'use strict';
2222

23+
var arrify = require('arrify');
2324
var common = require('@google-cloud/common');
2425
var events = require('events');
2526
var extend = require('extend');
@@ -288,21 +289,54 @@ Subscription.prototype.ack_ = function(message) {
288289
this.breakLease_(message);
289290
this.histogram.add(Date.now() - message.received);
290291

291-
if (!this.connectionPool || !this.connectionPool.isConnected()) {
292-
this.inventory_.ack.push(message.ackId);
293-
this.setFlushTimeout_();
292+
if (this.isConnected_()) {
293+
this.acknowledge_(message.ackId, message.connectionId);
294294
return;
295295
}
296296

297+
this.inventory_.ack.push(message.ackId);
298+
this.setFlushTimeout_();
299+
};
300+
301+
/**
302+
* Sends an acknowledge request for the provided ack ids.
303+
*
304+
* @private
305+
*
306+
* @param {string|string[]} ackIds - The ack IDs to acknowledge.
307+
* @param {string=} connId - Connection ID to send request on.
308+
* @return {Promise}
309+
*/
310+
Subscription.prototype.acknowledge_ = function(ackIds, connId) {
297311
var self = this;
312+
var promise;
298313

299-
this.connectionPool.acquire(message.connectionId, function(err, connection) {
300-
if (err) {
301-
self.emit('error', err);
302-
return;
303-
}
314+
ackIds = arrify(ackIds);
315+
316+
if (!this.isConnected_()) {
317+
promise = common.util.promisify(this.request).call(this, {
318+
client: 'subscriberClient',
319+
method: 'acknowledge',
320+
reqOpts: {
321+
subscription: this.name,
322+
ackIds
323+
}
324+
});
325+
} else {
326+
promise = new Promise(function(resolve, reject) {
327+
self.connectionPool.acquire(connId, function(err, connection) {
328+
if (err) {
329+
reject(err);
330+
return;
331+
}
332+
333+
connection.write({ ackIds }, resolve);
334+
});
335+
});
336+
}
304337

305-
connection.write({ ackIds: [message.ackId] });
338+
return promise.catch(function(err) {
339+
self.emit('error', err);
306340
});
307341
};
308342

@@ -360,6 +394,8 @@ Subscription.prototype.breakLease_ = function(message) {
360394
* subscription.close().then(function() {});
361395
*/
362396
Subscription.prototype.close = function(callback) {
397+
var self = this;
398+
363399
this.userClosed_ = true;
364400

365401
var inventory = this.inventory_;
@@ -368,8 +404,9 @@ Subscription.prototype.close = function(callback) {
368404
clearTimeout(this.leaseTimeoutHandle_);
369405
this.leaseTimeoutHandle_ = null;
370406

371-
this.flushQueues_();
372-
this.closeConnection_(callback);
407+
this.flushQueues_().then(function() {
408+
self.closeConnection_(callback);
409+
});
373410
};
374411

375412
/**
@@ -563,69 +600,28 @@ Subscription.prototype.flushQueues_ = function() {
563600
var nacks = this.inventory_.nack;
564601

565602
if (!acks.length && !nacks.length) {
566-
return;
603+
return Promise.resolve();
567604
}
568605

569-
if (this.connectionPool) {
570-
this.connectionPool.acquire(function(err, connection) {
571-
if (err) {
572-
self.emit('error', err);
573-
return;
574-
}
575-
576-
var reqOpts = {};
577-
578-
if (acks.length) {
579-
reqOpts.ackIds = acks;
580-
}
581-
582-
if (nacks.length) {
583-
reqOpts.modifyDeadlineAckIds = nacks;
584-
reqOpts.modifyDeadlineSeconds = Array(nacks.length).fill(0);
585-
}
586-
587-
connection.write(reqOpts);
588-
589-
self.inventory_.ack = [];
590-
self.inventory_.nack = [];
591-
});
592-
return;
593-
}
606+
var requests = [];
594607

595608
if (acks.length) {
596-
this.request({
597-
client: 'subscriberClient',
598-
method: 'acknowledge',
599-
reqOpts: {
600-
subscription: this.name,
601-
ackIds: acks
602-
}
603-
}, function(err) {
604-
if (err) {
605-
self.emit('error', err);
606-
} else {
609+
requests.push(
610+
this.acknowledge_(acks).then(function() {
607611
self.inventory_.ack = [];
608-
}
609-
});
612+
})
613+
);
610614
}
611615

612616
if (nacks.length) {
613-
this.request({
614-
client: 'subscriberClient',
615-
method: 'modifyAckDeadline',
616-
reqOpts: {
617-
subscription: this.name,
618-
ackIds: nacks,
619-
ackDeadlineSeconds: 0
620-
}
621-
}, function(err) {
622-
if (err) {
623-
self.emit('error', err);
624-
} else {
617+
requests.push(
618+
this.modifyAckDeadline_(nacks, 0).then(function() {
625619
self.inventory_.nack = [];
626-
}
627-
});
620+
})
621+
);
628622
}
623+
624+
return Promise.all(requests);
629625
};
630626

631627
/**
@@ -725,6 +721,17 @@ Subscription.prototype.getMetadata = function(gaxOpts, callback) {
725721
});
726722
};
727723

724+
/**
725+
* Checks to see if we currently have a streaming connection.
726+
*
727+
* @private
728+
*
729+
* @return {boolean}
730+
*/
731+
Subscription.prototype.isConnected_ = function() {
732+
return !!(this.connectionPool && this.connectionPool.isConnected());
733+
};
734+
728735
/**
729736
* Checks to see if this Subscription has hit any of the flow control
730737
* thresholds.
@@ -748,6 +755,9 @@ Subscription.prototype.hasMaxMessages_ = function() {
748755
* @param {object} message - The message object.
749756
*/
750757
Subscription.prototype.leaseMessage_ = function(message) {
758+
this.modifyAckDeadline_(
759+
message.ackId, this.ackDeadline / 1000, message.connectionId);
760+
751761
this.inventory_.lease.push(message.ackId);
752762
this.inventory_.bytes += message.length;
753763
this.setLeaseTimeout_();
@@ -789,6 +799,53 @@ Subscription.prototype.listenForEvents_ = function() {
789799
});
790800
};
791801

802+
/**
803+
* Sends a modifyAckDeadline request for the provided ack ids.
804+
*
805+
* @private
806+
*
807+
* @param {string|string[]} ackIds - The ack IDs to acknowledge.
808+
* @param {number} deadline - The dealine in seconds.
809+
* @param {string=} connId - Connection ID to send request on.
810+
* @return {Promise}
811+
*/
812+
Subscription.prototype.modifyAckDeadline_ = function(ackIds, deadline, connId) {
813+
var self = this;
814+
var promise;
815+
816+
ackIds = arrify(ackIds);
817+
818+
if (!this.isConnected_()) {
819+
promise = common.util.promisify(this.request).call(this, {
820+
client: 'subscriberClient',
821+
method: 'modifyAckDeadline',
822+
reqOpts: {
823+
subscription: self.name,
824+
ackDeadlineSeconds: deadline,
825+
ackIds
826+
}
827+
});
828+
} else {
829+
promise = new Promise(function(resolve, reject) {
830+
self.connectionPool.acquire(connId, function(err, connection) {
831+
if (err) {
832+
reject(err);
833+
return;
834+
}
835+
836+
connection.write({
837+
modifyDeadlineAckIds: ackIds,
838+
modifyDeadlineSeconds: Array(ackIds.length).fill(deadline)
839+
}, resolve);
840+
});
841+
});
842+
}
843+
844+
return promise.catch(function(err) {
845+
self.emit('error', err);
846+
});
847+
};
848+
792849
/**
793850
* Modify the push config for the subscription.
794851
*
@@ -855,25 +912,13 @@ Subscription.prototype.modifyPushConfig = function(config, gaxOpts, callback) {
855912
Subscription.prototype.nack_ = function(message) {
856913
this.breakLease_(message);
857914

858-
if (!this.connectionPool || !this.connectionPool.isConnected()) {
859-
this.inventory_.nack.push(message.ackId);
860-
this.setFlushTimeout_();
915+
if (this.isConnected_()) {
916+
this.modifyAckDeadline_(message.ackId, 0, message.connectionId);
861917
return;
862918
}
863919

864-
var self = this;
865-
866-
this.connectionPool.acquire(message.connectionId, function(err, connection) {
867-
if (err) {
868-
self.emit('error', err);
869-
return;
870-
}
871-
872-
connection.write({
873-
modifyDeadlineAckIds: [message.ackId],
874-
modifyDeadlineSeconds: [0]
875-
});
876-
});
920+
this.inventory_.nack.push(message.ackId);
921+
this.setFlushTimeout_();
877922
};
878923

879924
/**
@@ -930,35 +975,9 @@ Subscription.prototype.renewLeases_ = function() {
930975
var ackIds = this.inventory_.lease.slice();
931976
var ackDeadlineSeconds = this.ackDeadline / 1000;
932977

933-
if (this.connectionPool) {
934-
this.connectionPool.acquire(function(err, connection) {
935-
if (err) {
936-
self.emit('error', err);
937-
return;
938-
}
939-
940-
connection.write({
941-
modifyDeadlineAckIds: ackIds,
942-
modifyDeadlineSeconds: Array(ackIds.length).fill(ackDeadlineSeconds)
943-
});
944-
});
945-
} else {
946-
this.request({
947-
client: 'subscriberClient',
948-
method: 'modifyAckDeadline',
949-
reqOpts: {
950-
subscription: self.name,
951-
ackIds: ackIds,
952-
ackDeadlineSeconds: ackDeadlineSeconds
953-
}
954-
}, function(err) {
955-
if (err) {
956-
self.emit('error', err);
957-
}
958-
});
959-
}
960-
961-
this.setLeaseTimeout_();
978+
this.modifyAckDeadline_(ackIds, ackDeadlineSeconds).then(function() {
979+
self.setLeaseTimeout_();
980+
});
962981
};
963982

964983
/**

packages/pubsub/system-test/pubsub.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -629,8 +629,8 @@ describe('pubsub', function() {
629629

630630
return subscription.create().then(function() {
631631
return publisher.publish(new Buffer('Hello, world!'));
632-
}).then(function(messageIds) {
633-
messageId = messageIds[0];
632+
}).then(function(_messageId) {
633+
messageId = _messageId;
634634
});
635635
});
636636

0 commit comments

Comments
 (0)