Skip to content

Commit a7492c6

Browse files
pubsub: borrow request config from GAX (#2242)
* pubsub: borrow request config from GAX [ci skip] * tests * rm .only
1 parent 2e5e321 commit a7492c6

File tree

6 files changed

+216
-79
lines changed

6 files changed

+216
-79
lines changed

packages/pubsub/src/index.js

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ var Subscription = require('./subscription.js');
4545
*/
4646
var Topic = require('./topic.js');
4747

48+
/**
49+
* @type {object} - GAX's default configuration.
50+
*/
51+
var GAX_CONFIG = {
52+
Publisher: require('./v1/publisher_client_config.json').
53+
interfaces['google.pubsub.v1.Publisher'],
54+
Subscriber: require('./v1/subscriber_client_config.json').
55+
interfaces['google.pubsub.v1.Subscriber']
56+
};
57+
4858
/**
4959
* [Cloud Pub/Sub](https://developers.google.com/pubsub/overview) is a
5060
* reliable, many-to-many, asynchronous messaging service from Cloud
@@ -751,6 +761,24 @@ PubSub.prototype.topic = function(name) {
751761
return new Topic(this, name);
752762
};
753763

764+
/**
765+
* Intercept the call to {module:common/grpc-service#request}, making sure the
766+
* correct timeouts are set.
767+
*
768+
* @private
769+
*/
770+
PubSub.prototype.request = function(protoOpts) {
771+
var method = protoOpts.method;
772+
var camelCaseMethod = method[0].toUpperCase() + method.substr(1);
773+
var config = GAX_CONFIG[protoOpts.service].methods[camelCaseMethod];
774+
775+
if (is.undefined(arguments[0].timeout)) {
776+
arguments[0].timeout = config.timeout_millis;
777+
}
778+
779+
commonGrpc.Service.prototype.request.apply(this, arguments);
780+
};
781+
754782
/**
755783
* Determine the appropriate endpoint to use for API requests, first trying the
756784
* local Pub/Sub emulator environment variable (PUBSUB_EMULATOR_HOST), otherwise
@@ -775,7 +803,7 @@ PubSub.prototype.determineBaseUrl_ = function() {
775803

776804
/*! Developer Documentation
777805
*
778-
* These methods can be auto-paginated.
806+
* These methods can be agto-paginated.
779807
*/
780808
common.paginator.extend(PubSub, [
781809
'getSnapshots',
@@ -789,7 +817,12 @@ common.paginator.extend(PubSub, [
789817
* that a callback is omitted.
790818
*/
791819
common.util.promisifyAll(PubSub, {
792-
exclude: ['snapshot', 'subscription', 'topic']
820+
exclude: [
821+
'request',
822+
'snapshot',
823+
'subscription',
824+
'topic'
825+
]
793826
});
794827

795828
PubSub.Subscription = Subscription;

packages/pubsub/src/subscription.js

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ Subscription.prototype.ack = function(ackIds, options, callback) {
465465
ackIds: ackIds
466466
};
467467

468-
this.request(protoOpts, reqOpts, function(err, resp) {
468+
this.parent.request(protoOpts, reqOpts, function(err, resp) {
469469
if (!err) {
470470
ackIds.forEach(function(ackId) {
471471
delete self.inProgressAckIds[ackId];
@@ -523,7 +523,7 @@ Subscription.prototype.createSnapshot = function(name, callback) {
523523
subscription: this.name
524524
};
525525

526-
this.request(protoOpts, reqOpts, function(err, resp) {
526+
this.parent.request(protoOpts, reqOpts, function(err, resp) {
527527
if (err) {
528528
callback(err, null, resp);
529529
return;
@@ -601,7 +601,7 @@ Subscription.prototype.delete = function(callback) {
601601
subscription: this.name
602602
};
603603

604-
this.request(protoOpts, reqOpts, function(err, resp) {
604+
this.parent.request(protoOpts, reqOpts, function(err, resp) {
605605
if (err) {
606606
callback(err, resp);
607607
return;
@@ -696,9 +696,11 @@ Subscription.prototype.pull = function(options, callback) {
696696
maxMessages: options.maxResults
697697
};
698698

699-
this.activeRequest_ = this.request(protoOpts, reqOpts, function(err, resp) {
699+
this.activeRequest_ = this.parent.request(protoOpts, reqOpts, function(err) {
700700
self.activeRequest_ = null;
701701

702+
var resp = arguments[1];
703+
702704
if (err) {
703705
if (err.code === 504) {
704706
// Simulate a server timeout where no messages were received.
@@ -779,7 +781,7 @@ Subscription.prototype.seek = function(snapshot, callback) {
779781
throw new Error('Either a snapshot name or Date is needed to seek to.');
780782
}
781783

782-
this.request(protoOpts, reqOpts, callback);
784+
this.parent.request(protoOpts, reqOpts, callback);
783785
};
784786

785787
/**
@@ -825,7 +827,7 @@ Subscription.prototype.setAckDeadline = function(options, callback) {
825827
ackDeadlineSeconds: options.seconds
826828
};
827829

828-
this.request(protoOpts, reqOpts, function(err, resp) {
830+
this.parent.request(protoOpts, reqOpts, function(err, resp) {
829831
callback(err, resp);
830832
});
831833
};

packages/pubsub/src/topic.js

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ var IAM = require('./iam.js');
4949
*/
5050
function Topic(pubsub, name) {
5151
this.name = Topic.formatName_(pubsub.projectId, name);
52-
this.pubsub = pubsub;
5352

5453
var methods = {
5554
/**
@@ -319,7 +318,7 @@ Topic.prototype.getSubscriptions = function(options, callback) {
319318
options = options || {};
320319
options.topic = this;
321320

322-
return this.pubsub.getSubscriptions(options, callback);
321+
return this.parent.getSubscriptions(options, callback);
323322
};
324323

325324
/**
@@ -353,7 +352,7 @@ Topic.prototype.getSubscriptionsStream = function(options) {
353352
options = options || {};
354353
options.topic = this;
355354

356-
return this.pubsub.getSubscriptionsStream(options);
355+
return this.parent.getSubscriptionsStream(options);
357356
};
358357

359358
/**
@@ -475,7 +474,7 @@ Topic.prototype.publish = function(messages, options, callback) {
475474
.map(Topic.formatMessage_)
476475
};
477476

478-
this.request(protoOpts, reqOpts, function(err, result) {
477+
this.parent.request(protoOpts, reqOpts, function(err, result) {
479478
if (err) {
480479
callback(err, null, result);
481480
return;
@@ -541,7 +540,7 @@ Topic.prototype.publish = function(messages, options, callback) {
541540
* });
542541
*/
543542
Topic.prototype.subscribe = function(subName, options, callback) {
544-
this.pubsub.subscribe(this, subName, options, callback);
543+
this.parent.subscribe(this, subName, options, callback);
545544
};
546545

547546
/**
@@ -574,7 +573,7 @@ Topic.prototype.subscription = function(name, options) {
574573
options = options || {};
575574
options.topic = this;
576575

577-
return this.pubsub.subscription(name, options);
576+
return this.parent.subscription(name, options);
578577
};
579578

580579
/*! Developer Documentation

packages/pubsub/test/index.js

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,24 @@ var fakeUtil = extend({}, util, {
4040
}
4141

4242
promisified = true;
43-
assert.deepEqual(options.exclude, ['snapshot', 'subscription', 'topic']);
43+
assert.deepEqual(options.exclude, [
44+
'request',
45+
'snapshot',
46+
'subscription',
47+
'topic'
48+
]);
4449
}
4550
});
4651

4752
function FakeGrpcService() {
4853
this.calledWith_ = arguments;
4954
}
5055

56+
var grpcServiceRequestOverride;
57+
FakeGrpcService.prototype.request = function() {
58+
return (grpcServiceRequestOverride || util.noop).apply(this, arguments);
59+
};
60+
5161
function FakeSnapshot() {
5262
this.calledWith_ = arguments;
5363
}
@@ -74,6 +84,22 @@ var fakePaginator = {
7484
}
7585
};
7686

87+
var GAX_CONFIG_PUBLISHER_OVERRIDE = {};
88+
var GAX_CONFIG_SUBSCRIBER_OVERRIDE = {};
89+
90+
var GAX_CONFIG = {
91+
Publisher: {
92+
interfaces: {
93+
'google.pubsub.v1.Publisher': GAX_CONFIG_PUBLISHER_OVERRIDE
94+
}
95+
},
96+
Subscriber: {
97+
interfaces: {
98+
'google.pubsub.v1.Subscriber': GAX_CONFIG_SUBSCRIBER_OVERRIDE
99+
}
100+
}
101+
};
102+
77103
describe('PubSub', function() {
78104
var PubSub;
79105
var PROJECT_ID = 'test-project';
@@ -94,7 +120,10 @@ describe('PubSub', function() {
94120
},
95121
'./snapshot.js': FakeSnapshot,
96122
'./subscription.js': Subscription,
97-
'./topic.js': Topic
123+
'./topic.js': Topic,
124+
125+
'./v1/publisher_client_config.json': GAX_CONFIG.Publisher,
126+
'./v1/subscriber_client_config.json': GAX_CONFIG.Subscriber
98127
});
99128
});
100129

@@ -105,6 +134,7 @@ describe('PubSub', function() {
105134
});
106135

107136
beforeEach(function() {
137+
grpcServiceRequestOverride = null;
108138
SubscriptionOverride = null;
109139
pubsub = new PubSub(OPTIONS);
110140
pubsub.projectId = PROJECT_ID;
@@ -897,6 +927,86 @@ describe('PubSub', function() {
897927
});
898928
});
899929

930+
describe('request', function() {
931+
var TIMEOUT = Math.random();
932+
933+
beforeEach(function() {
934+
GAX_CONFIG_PUBLISHER_OVERRIDE.methods = {
935+
MethodName: {
936+
timeout_millis: TIMEOUT
937+
}
938+
};
939+
});
940+
941+
after(function() {
942+
GAX_CONFIG_PUBLISHER_OVERRIDE.methods = {};
943+
});
944+
945+
it('should pass through the request', function(done) {
946+
var args = [
947+
{
948+
service: 'Publisher',
949+
method: 'MethodName'
950+
},
951+
{
952+
value: true
953+
},
954+
{
955+
anotherValue: true
956+
}
957+
];
958+
959+
grpcServiceRequestOverride = function() {
960+
assert.strictEqual(this, pubsub);
961+
assert.strictEqual(args[0], arguments[0]);
962+
assert.strictEqual(args[1], arguments[1]);
963+
assert.strictEqual(args[2], arguments[2]);
964+
done();
965+
};
966+
967+
pubsub.request.apply(pubsub, args);
968+
});
969+
970+
it('should assign a timeout', function(done) {
971+
grpcServiceRequestOverride = function(protoOpts) {
972+
assert.strictEqual(protoOpts.timeout, TIMEOUT);
973+
done();
974+
};
975+
976+
pubsub.request({
977+
service: 'Publisher',
978+
method: 'MethodName'
979+
});
980+
});
981+
982+
it('should not override a timeout if set', function(done) {
983+
var timeout = 0;
984+
985+
grpcServiceRequestOverride = function(protoOpts) {
986+
assert.strictEqual(protoOpts.timeout, timeout);
987+
done();
988+
};
989+
990+
pubsub.request({
991+
service: 'Publisher',
992+
method: 'MethodName',
993+
timeout: timeout
994+
});
995+
});
996+
997+
it('should camel case the method name', function(done) {
998+
grpcServiceRequestOverride = function(protoOpts) {
999+
assert.strictEqual(protoOpts.timeout, TIMEOUT);
1000+
done();
1001+
};
1002+
1003+
pubsub.request({
1004+
service: 'Publisher',
1005+
method: 'methodName'
1006+
});
1007+
});
1008+
});
1009+
9001010
describe('determineBaseUrl_', function() {
9011011
function setHost(host) {
9021012
process.env.PUBSUB_EMULATOR_HOST = host;

0 commit comments

Comments
 (0)