Skip to content

Commit 1d0c11c

Browse files
move subscription methods up a level
1 parent 1c81c27 commit 1d0c11c

File tree

4 files changed

+183
-72
lines changed

4 files changed

+183
-72
lines changed

lib/common/util.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ util.arrayize = arrayize;
124124
*/
125125
function format(template, args) {
126126
return template.replace(/{([^}]*)}/g, function(match, key) {
127-
return args[key] || match;
127+
return key in args ? args[key] : match;
128128
});
129129
}
130130

lib/pubsub/index.js

+161-11
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,124 @@ PubSub.prototype.createTopic = function(name, callback) {
162162
});
163163
};
164164

165+
/**
166+
* Create a subscription to this topic. You may optionally provide an object to
167+
* customize the subscription.
168+
*
169+
* Your provided callback will either be invoked with an error object, if an API
170+
* error occurred, or a {@linkcode module:pubsub/subscription} object.
171+
*
172+
* @throws {Error} If a name is not provided.
173+
*
174+
* @param {string} name - The name of the subscription.
175+
* @param {object=} options - Configuration object.
176+
* @param {number=} options.ackDeadlineSeconds - The maximum time after
177+
* receiving a message that you must ack a message before it is redelivered.
178+
* @param {boolean=} options.autoAck - Automatically acknowledge the message
179+
* once it's pulled. (default: false)
180+
* @param {number=} options.interval - Interval in milliseconds to check for new
181+
* messages. (default: 10)
182+
* @param {string=} options.projectId - The projectId where the topic resource
183+
* exists.
184+
* @param {boolean=} options.reuseExisting - If the subscription already exists,
185+
* reuse it. The options of the existing subscription are not changed. If
186+
* false, attempting to create a subscription that already exists will fail.
187+
* (default: false)
188+
* @param {function} callback - The callback function.
189+
*
190+
* @example
191+
* // Without specifying any options.
192+
* var topic = 'messageTopic';
193+
* var sub = 'messageSubscription';
194+
*
195+
* pubsub.subscribe(topic, sub, function(err, subscription, apiResponse) {});
196+
*
197+
* // With options.
198+
* pubsub.subscribe(topic, sub, {
199+
* ackDeadlineSeconds: 90,
200+
* autoAck: true,
201+
* interval: 30
202+
* }, function(err, subscription, apiResponse) {});
203+
*/
204+
PubSub.prototype.subscribe = function(topicName, subName, options, callback) {
205+
var self = this;
206+
207+
if (!topicName) {
208+
throw new Error('A Topic name is required for a new subscription.');
209+
}
210+
211+
if (!subName) {
212+
throw new Error('A Subscription name is required for a new subscription.');
213+
}
214+
215+
if (!callback) {
216+
callback = options;
217+
options = {};
218+
}
219+
220+
var body = {
221+
topic: topicName
222+
};
223+
224+
if (options.ackDeadlineSeconds) {
225+
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
226+
}
227+
228+
var projectId = options.projectId || this.projectId;
229+
230+
var path = Subscription.formatName_(projectId, subName);
231+
232+
this.makeReq_('PUT', path, null, body, function(err, result) {
233+
if (options.reuseExisting && err && err.code === 409) {
234+
callback(null, self.subscription(subName, options), result);
235+
} else if (err) {
236+
callback(err, null, result);
237+
} else {
238+
callback(null, self.subscription(subName, options), result);
239+
}
240+
});
241+
};
242+
243+
/**
244+
* Create a Subscription object in reference to an existing subscription. This
245+
* command by itself will not run any API requests. You will receive a
246+
* {@linkcode module:pubsub/subscription} object, which will allow you to
247+
* interact with your subscription.
248+
*
249+
* @throws {Error} If a name is not provided.
250+
*
251+
* @param {string} name - The name of the subscription.
252+
* @param {object=} options - Configuration object.
253+
* @param {boolean=} options.autoAck - Automatically acknowledge the message
254+
* once it's pulled.
255+
* @param {number=} options.interval - Interval in milliseconds to check for new
256+
* messages.
257+
* @param {string=} options.projectId - The projectId where the subscription
258+
* exists.
259+
* @return {module:pubsub/subscription}
260+
*
261+
* @example
262+
* var subscription = pubsub.subscription('my-existing-subscription');
263+
*
264+
* // Register a listener for `message` events.
265+
* subscription.on('message', function(message) {
266+
* // Called every time a message is received.
267+
* // message.id = ID used to acknowledge its receival.
268+
* // message.data = Contents of the message.
269+
* // message.attributes = Attributes of the message.
270+
* });
271+
*/
272+
PubSub.prototype.subscription = function(name, options) {
273+
if (!name) {
274+
throw new Error('The name of a subscription is required.');
275+
}
276+
277+
options = options || {};
278+
options.name = name;
279+
280+
return new Subscription(this, options);
281+
};
282+
165283
/**
166284
* Create a Topic object to reference an existing topic.
167285
*
@@ -195,9 +313,13 @@ PubSub.prototype.topic = function(name, options) {
195313
* You may optionally provide a query object as the first argument to customize
196314
* the response.
197315
*
198-
* @param {object=} query - Query object.
199-
* @param {string=} query.pageToken - Page token.
200-
* @param {number=} query.pageSize - Maximum number of results to return.
316+
* @param {object=} options - Query object.
317+
* @param {string=} options.projectId - The projectId where the topic
318+
* resource exists.
319+
* @param {string=} options.topic - The name of the topic to list subscriptions
320+
* from.
321+
* @param {number=} options.pageSize - Maximum number of results to return.
322+
* @param {string=} options.pageToken - Page token.
201323
* @param {function} callback - The callback function.
202324
*
203325
* @example
@@ -214,30 +336,58 @@ PubSub.prototype.topic = function(name, options) {
214336
* pageSize: 10
215337
* }, callback);
216338
*/
217-
PubSub.prototype.getSubscriptions = function(query, callback) {
339+
PubSub.prototype.getSubscriptions = function(options, callback) {
218340
var self = this;
341+
219342
if (!callback) {
220-
callback = query;
221-
query = {};
343+
callback = options;
344+
options = {};
222345
}
223346

224-
var path = this.projectName + '/subscriptions';
225-
this.makeReq_('GET', path, query, true, function(err, result) {
347+
options = options || {};
348+
349+
var query = {};
350+
351+
if (options.pageSize) {
352+
query.pageSize = options.pageSize;
353+
}
354+
355+
if (options.pageToken) {
356+
query.pageToken = options.pageToken;
357+
}
358+
359+
var projectId = options.projectId || this.projectId;
360+
361+
var apiPath = util.format('{projectPath}{topicPath}/subscriptions', {
362+
projectPath: 'projects/' + projectId,
363+
topicPath: options.topic ? '/topics/' + options.topic : ''
364+
});
365+
366+
this.makeReq_('GET', apiPath, query, null, function(err, result) {
226367
if (err) {
227368
callback(err, null, null, result);
228369
return;
229370
}
230371

231-
var subscriptions = (result.subscriptions || []).map(function(item) {
372+
var subscriptions = (result.subscriptions || []).map(function(sub) {
373+
// Depending on if we're using a subscriptions.list or
374+
// topics.subscriptions.list API endpoint, we will get back a Subscription
375+
// resource or just the name of the subscription.
376+
var subName = sub.name || sub;
377+
232378
return new Subscription(self, {
233-
name: item.name
379+
projectId: projectId,
380+
name: subName
234381
});
235382
});
383+
236384
var nextQuery = null;
385+
237386
if (result.nextPageToken) {
238-
nextQuery = query;
387+
nextQuery = options;
239388
nextQuery.pageToken = result.nextPageToken;
240389
}
390+
241391
callback(null, subscriptions, nextQuery, result);
242392
});
243393
};

lib/pubsub/subscription.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,10 @@ var util = require('../common/util.js');
114114
function Subscription(pubsub, options) {
115115
events.EventEmitter.call(this);
116116

117-
this.name = Subscription.formatName_(pubsub.projectId, options.name);
117+
var projectId = options.projectId || pubsub.projectId;
118+
119+
this.name = Subscription.formatName_(projectId, options.name);
120+
118121
this.makeReq_ = pubsub.makeReq_.bind(pubsub);
119122

120123
this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;

lib/pubsub/topic.js

+17-59
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,6 @@
2626
*/
2727
var util = require('../common/util.js');
2828

29-
/**
30-
* @type {module:pubsub/subscription}
31-
* @private
32-
*/
33-
var Subscription = require('./subscription.js');
34-
3529
/*! Developer Documentation
3630
*
3731
* @param {module:pubsub} pubsub - PubSub object.
@@ -60,9 +54,9 @@ function Topic(pubsub, options) {
6054
this.name = Topic.formatName_(pubsub.projectId, options.name);
6155
this.projectId = pubsub.projectId;
6256
this.pubsub = pubsub;
57+
this.unformattedName = options.name;
6358

6459
if (options.autoCreate) {
65-
this.unformattedName = options.name;
6660
this.origMakeReq_ = this.makeReq_;
6761
this.makeReq_ = this.autoCreateWrapper_;
6862
}
@@ -220,9 +214,9 @@ Topic.prototype.delete = function(callback) {
220214
* error occurred, or an array of {@linkcode module:pubsub/subscription}
221215
* objects.
222216
*
223-
* @param {object=} query - Query object.
224-
* @param {string=} query.pageToken - Page token.
225-
* @param {number=} query.pageSize - Maximum number of results to return.
217+
* @param {object=} options - Configuration object.
218+
* @param {number=} options.pageSize - Maximum number of results to return.
219+
* @param {string=} options.pageToken - Page token.
226220
* @param {function} callback - The callback function.
227221
*
228222
* @example
@@ -237,32 +231,16 @@ Topic.prototype.delete = function(callback) {
237231
* pageSize: 3
238232
* }, function(err, subscriptions, nextQuery, apiResponse) {});
239233
*/
240-
Topic.prototype.getSubscriptions = function(query, callback) {
241-
var self = this;
242-
if (util.is(query, 'function')) {
243-
callback = query;
244-
query = {};
234+
Topic.prototype.getSubscriptions = function(options, callback) {
235+
if (!callback) {
236+
callback = options;
237+
options = {};
245238
}
246239

247-
var path = this.name + '/subscriptions';
248-
this.makeReq_('GET', path, query, true, function(err, result) {
249-
if (err) {
250-
callback(err, null, null, result);
251-
return;
252-
}
240+
options.projectId = this.pubsub.projectId;
241+
options.topic = this.unformattedName;
253242

254-
var subscriptions = (result.subscriptions || []).map(function(name) {
255-
return new Subscription(self, {
256-
name: name
257-
});
258-
});
259-
var nextQuery = null;
260-
if (result.nextPageToken) {
261-
nextQuery = query;
262-
nextQuery.pageToken = result.nextPageToken;
263-
}
264-
callback(null, subscriptions, nextQuery, result);
265-
});
243+
this.pubsub.getSubscriptions(options, callback);
266244
};
267245

268246
/**
@@ -300,34 +278,17 @@ Topic.prototype.getSubscriptions = function(query, callback) {
300278
* }, function(err, subscription, apiResponse) {});
301279
*/
302280
Topic.prototype.subscribe = function(name, options, callback) {
303-
var self = this;
304-
if (!name) {
305-
throw new Error('A name is required for a new subscription.');
306-
}
307281
if (!callback) {
308282
callback = options;
309283
options = {};
310284
}
311285

312-
var body = {
313-
topic: this.name
314-
};
315-
316-
if (options.ackDeadlineSeconds) {
317-
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
318-
}
286+
var topicName = this.name;
319287

320-
var path = Subscription.formatName_(this.projectId, name);
288+
options = options || {};
289+
options.projectId = this.pubsub.projectId;
321290

322-
this.makeReq_('PUT', path, null, body, function(err, result) {
323-
if (options.reuseExisting && err && err.code === 409) {
324-
callback(null, self.subscription(name, options), result);
325-
} else if (err) {
326-
callback(err, null, result);
327-
} else {
328-
callback(null, self.subscription(name, options), result);
329-
}
330-
});
291+
return this.pubsub.subscribe(topicName, name, options, callback);
331292
};
332293

333294
/**
@@ -358,12 +319,9 @@ Topic.prototype.subscribe = function(name, options, callback) {
358319
* });
359320
*/
360321
Topic.prototype.subscription = function(name, options) {
361-
if (!name) {
362-
throw new Error('The name of a subscription is required.');
363-
}
364322
options = options || {};
365-
options.name = name;
366-
return new Subscription(this.pubsub, options);
323+
options.projectId = this.pubsub.projectId;
324+
return this.pubsub.subscription(name, options);
367325
};
368326

369327
module.exports = Topic;

0 commit comments

Comments
 (0)