Skip to content

Commit 920f865

Browse files
pubsub: allow aborting an active HTTP request + auto-abort when subscription closed
1 parent 1882b6e commit 920f865

File tree

5 files changed

+99
-4
lines changed

5 files changed

+99
-4
lines changed

lib/common/util.js

+13-2
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ function makeAuthenticatedRequestFactory(config) {
323323
function makeAuthenticatedRequest(reqOpts, options) {
324324
var stream;
325325
var reqConfig = extend({}, config);
326+
var activeRequest_;
326327

327328
if (!options) {
328329
stream = duplexify();
@@ -345,7 +346,8 @@ function makeAuthenticatedRequestFactory(config) {
345346
if (options && options.onAuthenticated) {
346347
options.onAuthenticated(null, authenticatedReqOpts);
347348
} else {
348-
util.makeRequest(authenticatedReqOpts, reqConfig, options);
349+
activeRequest_ =
350+
util.makeRequest(authenticatedReqOpts, reqConfig, options);
349351
}
350352
}
351353

@@ -360,6 +362,15 @@ function makeAuthenticatedRequestFactory(config) {
360362
if (stream) {
361363
return stream;
362364
}
365+
366+
return {
367+
abort: function() {
368+
if (activeRequest_) {
369+
activeRequest_.abort();
370+
activeRequest_ = null;
371+
}
372+
}
373+
};
363374
}
364375

365376
makeAuthenticatedRequest.getCredentials =
@@ -426,7 +437,7 @@ function makeRequest(reqOpts, config, callback) {
426437

427438
dup.abort = requestStream.abort;
428439
} else {
429-
retryRequest(reqOpts, options, function(err, response, body) {
440+
return retryRequest(reqOpts, options, function(err, response, body) {
430441
util.handleResp(err, response, body, callback);
431442
});
432443
}

lib/pubsub/subscription.js

+7-1
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ Subscription.prototype.pull = function(options, callback) {
509509
options.maxResults = MAX_EVENTS_LIMIT;
510510
}
511511

512-
this.request({
512+
this.activeRequest_ = this.request({
513513
timeout: this.timeout,
514514
method: 'POST',
515515
uri: ':pull',
@@ -518,6 +518,8 @@ Subscription.prototype.pull = function(options, callback) {
518518
maxMessages: options.maxResults
519519
}
520520
}, function(err, response) {
521+
self.activeRequest_ = null;
522+
521523
if (err) {
522524
if (err.code === 'ETIMEDOUT' && !err.connect) {
523525
// Simulate a server timeout where no messages were received.
@@ -616,6 +618,10 @@ Subscription.prototype.listenForEvents_ = function() {
616618
this.on('removeListener', function(event) {
617619
if (event === 'message' && --self.messageListeners === 0) {
618620
self.closed = true;
621+
622+
if (self.activeRequest_) {
623+
self.activeRequest_.abort();
624+
}
619625
}
620626
});
621627
};

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@
105105
"protobufjs": "^5.0.1",
106106
"pumpify": "^1.3.3",
107107
"request": "^2.53.0",
108-
"retry-request": "^1.2.1",
108+
"retry-request": "^1.2.3",
109109
"split-array-stream": "^1.0.0",
110110
"stream-events": "^1.0.1",
111111
"string-format-obj": "^1.0.0",

test/common/util.js

+40
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,35 @@ describe('common/util', function() {
789789
makeAuthenticatedRequest(reqOpts, done);
790790
});
791791

792+
it('should return abort() from the active request', function(done) {
793+
var retryRequest = {
794+
abort: done
795+
};
796+
797+
utilOverrides.makeRequest = function() {
798+
return retryRequest;
799+
};
800+
801+
var makeAuthenticatedRequest = util.makeAuthenticatedRequestFactory();
802+
makeAuthenticatedRequest(reqOpts, assert.ifError).abort();
803+
});
804+
805+
it('should only abort() once', function(done) {
806+
var retryRequest = {
807+
abort: done // Will throw if called more than once.
808+
};
809+
810+
utilOverrides.makeRequest = function() {
811+
return retryRequest;
812+
};
813+
814+
var makeAuthenticatedRequest = util.makeAuthenticatedRequestFactory();
815+
var request = makeAuthenticatedRequest(reqOpts, assert.ifError);
816+
817+
request.abort(); // done()
818+
request.abort(); // done()
819+
});
820+
792821
it('should provide stream to makeRequest', function(done) {
793822
var stream;
794823

@@ -916,6 +945,17 @@ describe('common/util', function() {
916945
retryRequestOverride = testCustomRetryRequestConfig(done);
917946
util.makeRequest(reqOpts, customRetryRequestConfig);
918947
});
948+
949+
it('should return the instance of retryRequest', function() {
950+
var requestInstance = {};
951+
952+
retryRequestOverride = function() {
953+
return requestInstance;
954+
};
955+
956+
var request = util.makeRequest(reqOpts, assert.ifError);
957+
assert.strictEqual(request, requestInstance);
958+
});
919959
});
920960

921961
describe('stream mode', function() {

test/pubsub/subscription.js

+38
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,33 @@ describe('Subscription', function() {
497497
subscription.pull(assert.ifError);
498498
});
499499

500+
it('should store the active request', function() {
501+
var requestInstance = {};
502+
503+
subscription.request = function() {
504+
return requestInstance;
505+
};
506+
507+
subscription.pull(assert.ifError);
508+
assert.strictEqual(subscription.activeRequest_, requestInstance);
509+
});
510+
511+
it('should clear the active request', function(done) {
512+
var requestInstance = {};
513+
514+
subscription.request = function(reqOpts, callback) {
515+
setImmediate(function() {
516+
callback(null, {});
517+
assert.strictEqual(subscription.activeRequest_, null);
518+
done();
519+
});
520+
521+
return requestInstance;
522+
};
523+
524+
subscription.pull(assert.ifError);
525+
});
526+
500527
it('should pass error to callback', function(done) {
501528
var error = new Error('Error.');
502529
subscription.request = function(reqOpts, callback) {
@@ -825,6 +852,17 @@ describe('Subscription', function() {
825852
// 0 listeners: sub should be closed.
826853
assert.strictEqual(subscription.closed, true);
827854
});
855+
856+
it('should abort the HTTP request when listeners removed', function(done) {
857+
subscription.startPulling_ = util.noop;
858+
859+
subscription.activeRequest_ = {
860+
abort: done
861+
};
862+
863+
subscription.on('message', util.noop);
864+
subscription.removeAllListeners();
865+
});
828866
});
829867

830868
describe('startPulling_', function() {

0 commit comments

Comments
 (0)