From 3acb4465c2f3ac7aafd704f2351b121a7e439923 Mon Sep 17 00:00:00 2001
From: Stephen Sawchuk
Date: Mon, 6 Oct 2014 16:09:05 -0400
Subject: [PATCH] (pubsub) refactor api.
---
docs/components/docs/docs-values.js | 17 +-
docs/components/docs/docs.html | 18 +
docs/components/docs/docs.js | 3 +-
docs/css/main.css | 5 +
lib/common/util.js | 3 +
lib/index.js | 29 +-
lib/pubsub/index.js | 532 ++++++++++------------------
lib/pubsub/subscription.js | 329 +++++++++++++++++
lib/pubsub/topic.js | 270 ++++++++++++++
package.json | 8 +-
regression/pubsub.js | 200 ++++++-----
scripts/docs.sh | 29 ++
test/pubsub/index.js | 219 ++++++++----
test/pubsub/subscription.js | 430 ++++++++++++++++++++++
test/pubsub/topic.js | 341 ++++++++++++++++++
15 files changed, 1918 insertions(+), 515 deletions(-)
create mode 100644 lib/pubsub/subscription.js
create mode 100644 lib/pubsub/topic.js
create mode 100755 scripts/docs.sh
create mode 100644 test/pubsub/subscription.js
create mode 100644 test/pubsub/topic.js
diff --git a/docs/components/docs/docs-values.js b/docs/components/docs/docs-values.js
index fc3f9952daa..c785015e671 100644
--- a/docs/components/docs/docs-values.js
+++ b/docs/components/docs/docs-values.js
@@ -39,6 +39,21 @@ angular.module('gcloud.docs')
]
},
+ pubsub: {
+ title: 'PubSub',
+ _url: '{baseUrl}/pubsub',
+ pages: [
+ {
+ title: 'Topic',
+ url: '/topic'
+ },
+ {
+ title: 'Subscription',
+ url: '/subscription'
+ }
+ ]
+ },
+
storage: {
title: 'Storage',
_url: '{baseUrl}/storage'
@@ -49,6 +64,6 @@ angular.module('gcloud.docs')
// https://github.com/npm/node-semver#versions
// List should be in ascending order.
'<=0.7.1': ['gcloud', 'datastore', 'storage'],
- '>0.7.1': ['gcloud', 'datastoreWithTransaction', 'storage']
+ '>0.7.1': ['gcloud', 'datastoreWithTransaction', 'pubsub', 'storage']
}
});
diff --git a/docs/components/docs/docs.html b/docs/components/docs/docs.html
index eb6ead22cab..fba03068e2b 100644
--- a/docs/components/docs/docs.html
+++ b/docs/components/docs/docs.html
@@ -73,6 +73,24 @@ Datastore Overview
+
Storage Overview
diff --git a/docs/components/docs/docs.js b/docs/components/docs/docs.js
index 98d3dc89ca7..96af4591334 100644
--- a/docs/components/docs/docs.js
+++ b/docs/components/docs/docs.js
@@ -172,7 +172,8 @@ angular
return function(mixInData) {
return mixInData
.reduce(function(acc, mixInMethods) {
- return acc = acc.concat(mixInMethods);
+ acc = acc.concat(mixInMethods);
+ return acc;
}, data)
.sort(function(a, b) {
return a.name > b.name;
diff --git a/docs/css/main.css b/docs/css/main.css
index 32a20d55194..e8d4d001dcb 100755
--- a/docs/css/main.css
+++ b/docs/css/main.css
@@ -617,6 +617,11 @@ h2, h3 {
position: relative;
}
+.notice {
+ background-color: #e5ecf9;
+ padding: 8px;
+}
+
.permalink {
display: none;
position: absolute;
diff --git a/lib/common/util.js b/lib/common/util.js
index 8158afce99d..ec27a6d09e7 100644
--- a/lib/common/util.js
+++ b/lib/common/util.js
@@ -183,6 +183,9 @@ module.exports.handleResp = handleResp;
* @return {string}
*/
function getType(value) {
+ if (value instanceof Buffer) {
+ return 'buffer';
+ }
return Object.prototype.toString.call(value).match(/\s(\w+)\]/)[1];
}
diff --git a/lib/index.js b/lib/index.js
index 530f42352d9..3d01d7ca23c 100644
--- a/lib/index.js
+++ b/lib/index.js
@@ -20,6 +20,12 @@
'use strict';
+/**
+ * @type module:common/util
+ * @private
+ */
+var util = require('./common/util.js');
+
/**
* @type {module:datastore}
* @private
@@ -30,7 +36,7 @@ var Datastore = require('./datastore');
* @type {module:pubsub}
* @private
*/
-var pubsub = require('./pubsub');
+var PubSub = require('./pubsub');
/**
* @type {module:storage}
@@ -86,13 +92,17 @@ var Storage = require('./storage');
*
* var bucket = gcloud.storage.bucket({
* bucketName: 'PhotosBucket',
- * // properties may be overriden:
+ * // properties may be overridden:
* keyFilename: '/path/to/other/keyfile.json'
* });
*/
function gcloud(config) {
return {
datastore: new Datastore(config),
+ pubsub: function(options) {
+ options = options || {};
+ return new PubSub(util.extendGlobalConfig(config, options));
+ },
storage: new Storage(config)
};
}
@@ -130,24 +140,25 @@ gcloud.datastore = Datastore;
* Note: Google Cloud Pub/Sub API is available as a Limited Preview and the
* client library we provide is currently experimental. The API and/or the
* client might be changed in backward-incompatible ways. This API is not
- * subject to any SLA or deprecation policy. Request to be whitelisted to use
- * it by filling the
+ * subject to any SLA or deprecation policy. Request to be whitelisted to use it
+ * by filling the
* [Limited Preview application form]{@link http://goo.gl/sO0wTu}.
*
* @type {module:pubsub}
*
- * @return {object}
+ * @return {module:pubsub}
*
* @example
* var gcloud = require('gcloud');
- * var pubsub = gcloud.pubsub;
*
- * var conn = new pubsub.Connection({
+ * var pubsub = gcloud.pubsub({
* projectId: YOUR_PROJECT_ID,
* keyFilename: '/path/to/the/key.json'
* });
*/
-gcloud.pubsub = pubsub;
+gcloud.pubsub = function(config) {
+ return new PubSub(config);
+};
/**
* Google Cloud Storage allows you to store data on Google infrastructure.
@@ -171,7 +182,7 @@ gcloud.pubsub = pubsub;
*
* // storage:
* // {
- * // Bucket: function() {}
+ * // bucket: function() {}
* // }
*/
gcloud.storage = Storage;
diff --git a/lib/pubsub/index.js b/lib/pubsub/index.js
index 31d67eec1e3..d6646b381b2 100644
--- a/lib/pubsub/index.js
+++ b/lib/pubsub/index.js
@@ -1,4 +1,4 @@
-/**
+/*!
* Copyright 2014 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -14,447 +14,285 @@
* limitations under the License.
*/
-/**
+/*!
* @module pubsub
*/
'use strict';
-var events = require('events');
-var extend = require('extend');
-var nodeutil = require('util');
-
-/** @type {module:common/connection} */
-var conn = require('../common/connection.js');
-
-/** @type {module:common/util} */
-var util = require('../common/util.js');
-
-/** @private @const {string} Base URL for Pub/Sub API. */
-var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1';
-
-/** @const {array} Required scopes for Pub/Sub API. */
-var SCOPES = [
- 'https://www.googleapis.com/auth/pubsub',
- 'https://www.googleapis.com/auth/cloud-platform'
-];
-
-function Subscription(conn, name) {
- this.conn = conn;
- this.name = name;
-
- this.autoAck = false;
- this.pullIntervalInMs = 10;
- this.closed = false;
-}
-
-nodeutil.inherits(Subscription, events.EventEmitter);
-
-/**
- * Acknowledges the backend that message is retrieved.
- * @param {Array} ids A list of message IDs.
- * @param {Function} callback Callback function.
- */
-Subscription.prototype.ack = function(ids, callback) {
- ids = util.arrayize(ids);
- var body = {
- subscription: this.name,
- ackId: ids
- };
- this.conn.makeReq('POST', 'subscriptions/acknowledge', null, body, callback);
-};
-
-/**
- * Pulls from the subscribed topic.
- * @param {Boolean} opts.returnImmediately If set, the system will respond
- * immediately. Otherwise, wait
- * until new messages are available.
- * Returns if timeout is reached.
- * @param {Function} callback Callback.
- */
-Subscription.prototype.pull = function(opts, callback) {
- var that = this;
- // TODO(jbd): Should not be racing with other pull.
- // TOOD(jbd): Make opts optional.
- var body = {
- subscription: this.name,
- returnImmediately: !!opts.returnImmediately
- };
- this.conn.makeReq(
- 'POST', 'subscriptions/pull', null, body, function(err, message) {
- // TODO(jbd): Fix API to return a list of messages.
- if (err) {
- callback(err);
- return;
- }
- if (!that.autoAck) {
- that.emitMessage_(message);
- callback();
- return;
- }
- that.ack(message.ackId, function(err) {
- if (err) {
- callback(err);
- return;
- }
- that.emitMessage_(message);
- callback();
- });
- });
-};
-
-/**
- * Polls the backend for new messages.
- */
-Subscription.prototype.startPulling_ = function() {
- var that = this;
- var pullFn = function() {
- if (that.closed) {
- return;
- }
- that.pull({ returnImmediately: false }, function(err) {
- // TODO(jbd): Fix API to return a more explicit error code or message.
- if (err && err.message.indexOf('has no more messages') < 0) {
- that.emitError_(err);
- }
- setTimeout(function() {
- pullFn();
- }, that.pullIntervalInMs);
- });
- };
- pullFn();
-};
-
/**
- * Deletes the current subscription. Pull requests from the current
- * subscription will be errored once unsubscription is done.
- * @param {Function} callback Optional callback.
+ * @type module:common/connection
+ * @private
*/
-Subscription.prototype.del = function(callback) {
- callback = callback || util.noop;
- var that = this;
- var path = util.format('subscriptions/{fullName}', {
- fullName: this.name
- });
- this.conn.makeReq('DELETE', path, null, true, function(err) {
- if (err) {
- callback(err);
- return;
- }
- that.closed = true;
- callback(null);
- });
-};
-
-/**
- * Closes the subscription.
- */
-Subscription.prototype.close = function() {
- this.closed = true;
-};
-
-/**
- * Emits a 'message' event with the provided message.
- */
-Subscription.prototype.emitMessage_ = function(msg) {
- if (msg.pubsubEvent && msg.pubsubEvent.message) {
- var data = msg.pubsubEvent.message.data;
- msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8');
- }
- this.emit('message', msg);
-};
+var conn = require('../common/connection.js');
/**
- * Emits an error with the provided error.
+ * @type module:pubsub/subscription
+ * @private
*/
-Subscription.prototype.emitError_ = function(err) {
- this.emit('error', err);
-};
+var Subscription = require('./subscription.js');
/**
- * Represents a Google Cloud Pub/Sub API topic.
- * @param {Connection} conn Authorized connection.
- * @param {string} name Full name of the topic.
+ * @type module:pubsub/topic
+ * @private
*/
-function Topic(conn, name) {
- this.conn = conn;
- this.name = name;
-}
+var Topic = require('./topic.js');
/**
- * Publishes the provided string message.
- * @param {string} data String message to publish.
- * @param {Function} callback Optional callback.
+ * @type module:common/util
+ * @private
*/
-Topic.prototype.publish = function(data, callback) {
- callback = callback || util.noop;
- this.publishMessage({
- topic: this.name,
- message: {
- data: new Buffer(data).toString('base64')
- }
- }, callback);
-};
+var util = require('../common/util.js');
/**
- * Publishes a raw message.
- * @param {message} message Raw message to publish.
- * @param {Function} callback Optional callback.
+ * @const {string} Base URL for Pub/Sub API.
+ * @private
*/
-Topic.prototype.publishMessage = function(message, callback) {
- callback = callback || util.noop;
- message.topic = this.name;
- this.conn.makeReq('POST', 'topics/publish', null, message, callback);
-};
+var PUBSUB_BASE_URL = 'https://www.googleapis.com/pubsub/v1beta1';
/**
- * Deletes a topic.
- * @param {Function} callback Optional callback.
+ * @const {array} Required scopes for Pub/Sub API.
+ * @private
*/
-Topic.prototype.del = function(callback) {
- callback = callback || util.noop;
- var path = 'topics/' + this.name;
- this.conn.makeReq('DELETE', path, null, true, callback);
-};
+var SCOPES = [
+ 'https://www.googleapis.com/auth/pubsub',
+ 'https://www.googleapis.com/auth/cloud-platform'
+];
/**
- * Represents connection to Google Cloud Pub/Sub API.
- * @param {string} opts.projectId Google Developers Console Project ID.
- * @param {string} opts.email Service account email.
+ * [Google Cloud Pub/Sub]{@link https://developers.google.com/pubsub/overview}
+ * is a reliable, many-to-many, asynchronous messaging service from Google
+ * Cloud Platform.
+ *
+ * Note: Google Cloud Pub/Sub API is available as a Limited Preview and the
+ * client library we provide is currently experimental. The API and/or the
+ * client might be changed in backward-incompatible ways. This API is not
+ * subject to any SLA or deprecation policy. Request to be whitelisted to use
+ * it by filling the
+ * [Limited Preview application form]{@link http://goo.gl/sO0wTu}.
+ *
+ * @constructor
+ * @alias module:pubsub
+ *
+ * @param {object=} options - Configuration object.
+ * @param {string=} options.projectId - Google Developers Console Project ID.
* @param {string=} options.keyFilename - Full path to the JSON key downloaded
* from the Google Developers Console. Alternatively, you may provide a
* `credentials` object.
* @param {object=} options.credentials - Credentials object, used in place of
* a `keyFilename`.
+ *
+ * @example
+ * var gcloud = require('gcloud');
+ *
+ * // From Google Compute Engine and Google App Engine:
+ *
+ * // Access `pubsub` through the `gcloud` module directly.
+ * var pubsub = gcloud.pubsub();
+ *
+ * // Elsewhere:
+ *
+ * // Provide configuration details up-front.
+ * var myProject = gcloud({
+ * keyFilename: '/path/to/keyfile.json',
+ * projectId: 'my-project'
+ * });
+ *
+ * var pubsub = myProject.pubsub();
+ *
+ *
+ * // Override default configuration details.
+ * var anotherPubsubConnection = myProject.pubsub({
+ * keyFilename: '/path/to/another/keyfile.json',
+ * });
+ *
+ *
+ * // Specify all options at instantiation.
+ * var pubsub = gcloud.pubsub({
+ * keyFilename: '/path/to/keyfile.json',
+ * projectId: 'my-project'
+ * });
*/
-function Connection(opts) {
- opts = opts || {};
- var id = opts.projectId;
+function PubSub(options) {
+ options = options || {};
- this.id = id;
- this.conn = new conn.Connection({
- credentials: opts.credentials,
- keyFilename: opts.keyFilename,
+ this.connection = new conn.Connection({
+ credentials: options.credentials,
+ keyFilename: options.keyFilename,
scopes: SCOPES
});
+ this.projectId = options.projectId;
+ this.projectName = '/projects/' + this.projectId;
}
/**
- * Lists subscriptions.
- * @param {string} query.filterByTopic Returns subscriptions that are
- * subscribed to the topic provided.
- * @param {string} query.pageToken Page token.
- * @param {Number} query.maxResults Max number of results to return.
- * @param {Function} callback Callback function.
+ * Get a list of the topics registered to your project. You may optionally
+ * provide a query object as the first argument to customize the response.
+ *
+ * @param {object=} query - Query object.
+ * @param {string=} query.pageToken - Page token.
+ * @param {number=} query.maxResults - Maximum number of results to return.
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * // Get all topics.
+ * pubsub.getTopics(function(err, topics, nextQuery) {
+ * // If `nextQuery` is non-null, there may be more results to fetch. To do
+ * // so, run `pubsub.getTopics(nextQuery, callback);`.
+ * });
+ *
+ * // Customize the query.
+ * pubsub.getTopics({
+ * maxResults: 3
+ * }, function(err, topics, nextQuery) {});
*/
-Connection.prototype.listSubscriptions = function(query, callback) {
+PubSub.prototype.getTopics = function(query, callback) {
var that = this;
- if (arguments.length < 2) {
+ if (!callback) {
callback = query;
query = {};
}
- var q = extend({}, query);
- if (q.filterByTopic) {
- q.query =
- 'pubsub.googleapis.com/topic in (' +
- this.fullTopicName_(q.filterByTopic) + ')';
- } else {
- q.query =
- 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')';
- }
- delete q.filterByTopic;
-
- this.makeReq('GET', 'subscriptions', q, true, function(err, result) {
+ query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
+ this.makeReq_('GET', 'topics', query, true, function(err, result) {
if (err) {
callback(err);
return;
}
- var items = result.subscription || [];
- var subscriptions = items.map(function(item) {
- return new Subscription(that, item.name);
+ var topics = (result.topic || []).map(function(item) {
+ return new Topic(that, {
+ name: item.name
+ });
});
var nextQuery = null;
if (result.nextPageToken) {
- nextQuery = q;
+ nextQuery = query;
nextQuery.pageToken = result.nextPageToken;
}
- callback(null, subscriptions, nextQuery);
+ callback(null, topics, nextQuery);
});
};
/**
- * Gets a subscription.
- * @param {string} name Name of the subscription.
- * @param {Function} callback Callback.
+ * Create a topic with the given name.
+ *
+ * @param {string} name - Name of the topic.
+ * @param {function=} callback - The callback function.
+ *
+ * @example
+ * pubsub.createTopic('my-new-topic', function(err, topic) {
+ * topic.publish('New message!', function(err) {});
+ * });
*/
-Connection.prototype.getSubscription = function(name, callback) {
- var that = this;
- var fullName = '/subscriptions/' + this.id + '/' + name;
- this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) {
- if (err) {
- callback(err);
- return;
- }
- callback(null, new Subscription(that, fullName));
- });
-};
-
-Connection.prototype.createSubscription = function(opts, callback) {
- var that = this;
- var subscription = {
- topic:'/topics/' + this.id + '/' + opts.topic,
- name: '/subscriptions/' + this.id + '/' + opts.name,
- ackDeadlineSeconds: opts.ackDeadlineSeconds
+PubSub.prototype.createTopic = function(name, callback) {
+ callback = callback || util.noop;
+ var topic = this.topic(name);
+ var req = {
+ name: topic.name
};
- this.makeReq('POST', 'subscriptions', null, subscription, function(err) {
+ this.makeReq_('POST', 'topics', null, req, function(err) {
if (err) {
callback(err);
return;
}
- callback(null, new Subscription(that, subscription.name));
- });
+ callback(null, topic);
+ }.bind(this));
};
/**
- * Subscribe with the provided options.
- * @param {string} name Name of the subscription.
- * @param {Boolean} opts.autoAck Automatically acknowledges the
- * message once it's pulled.
- * @return {Subscription}
+ * Create a Topic object to reference an existing topic.
+ *
+ * @throws {Error} If a name is not provided.
+ *
+ * @param {string} name - The name of the topic.
+ * @return {module:pubsub/topic}
+ *
+ * @example
+ * var topic = pubsub.topic('my-existing-topic');
+ * topic.publish('New message!');
*/
-Connection.prototype.subscribe = function(name, opts) {
- opts = opts || {};
-
- var fullName = '/subscriptions/' + this.id + '/' + name;
- var sub = new Subscription(this, fullName);
- sub.autoAck = !!opts.autoAck;
- this.getSubscription(name, function(err) {
- if (err) {
- sub.emitError_(err);
- return;
- }
- sub.emit('ready');
- sub.startPulling_();
+PubSub.prototype.topic = function(name) {
+ if (!name) {
+ throw new Error('A name must be specified for a new topic.');
+ }
+ return new Topic(this, {
+ name: name
});
- return sub;
};
/**
- * Lists topics.
- * @param {string} query.pageToken Page token.
- * @param {Number} query.maxResults Max number of results to return.
- * @param {Function} callback Callback function.
+ * Get a list of the subscriptions registered to all of your project's topics.
+ * You may optionally provide a query object as the first argument to customize
+ * the response.
+ *
+ * @param {object=} query - Query object.
+ * @param {string=} query.pageToken - Page token.
+ * @param {number=} query.maxResults - Maximum number of results to return.
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * // Get all subscriptions.
+ * pubsub.getSubscriptions(function(err, subscriptions, nextQuery) {
+ * // If `nextQuery` is non-null, there may be more results to fetch. To do
+ * // so, run `pubsub.getSubscriptions(nextQuery, callback);`.
+ * });
+ *
+ * // Customize the query.
+ * pubsub.getSubscriptions({
+ * maxResults: 10
+ * }, function(err, subscriptions, nextQuery) {});
*/
-Connection.prototype.listTopics = function(query, callback) {
+PubSub.prototype.getSubscriptions = function(query, callback) {
var that = this;
- if (arguments.length < 2) {
+ if (!callback) {
callback = query;
query = {};
}
- var q = extend({}, query);
- q.query = 'cloud.googleapis.com/project in (' + this.fullProjectName_() + ')';
- this.makeReq('GET', 'topics', q, true, function(err, result) {
+ if (!query.query) {
+ query.query = 'cloud.googleapis.com/project in (' + this.projectName + ')';
+ }
+ this.makeReq_('GET', 'subscriptions', query, true, function(err, result) {
if (err) {
callback(err);
return;
}
- var items = result.topic || [];
- var topics = items.map(function(item) {
- return new Topic(that, item.name);
+ var subscriptions = (result.subscription || []).map(function(item) {
+ return new Subscription(that, {
+ name: item.name
+ });
});
var nextQuery = null;
if (result.nextPageToken) {
- nextQuery = q;
+ nextQuery = query;
nextQuery.pageToken = result.nextPageToken;
}
- callback(null, topics, nextQuery);
- });
-};
-
-/**
- * Gets a topic.
- * @param {string} name Name of the topic to get.
- * @param {Function} callback Optional callback.
- */
-Connection.prototype.getTopic = function(name, callback) {
- var that = this;
- callback = callback || util.noop;
- var fullName = this.fullTopicName_(name);
- this.makeReq('GET', 'topics/' + fullName, null, true, function(err) {
- if (err) {
- callback(err);
- return;
- }
- callback(null, new Topic(that, fullName));
- });
-};
-
-/**
- * Creates a topic with the given name.
- * @param {string} name Name of the topic.
- * @param {Function} callback Optional callback.
- */
-Connection.prototype.createTopic = function(name, callback) {
- var that = this;
- callback = callback || util.noop;
- var fullName = this.fullTopicName_(name);
- this.makeReq('POST', 'topics', null, { name: fullName }, function(err) {
- if (err) {
- callback(err);
- return;
- }
- callback(null, new Topic(that, fullName));
- });
-};
-
-/**
- * Returns the full name of a topic.
- * Full name is in /topics// form.
- */
-Connection.prototype.fullTopicName_ = function(name) {
- return util.format('/topics/{projectId}/{name}', {
- projectId: this.id, name: name
+ callback(null, subscriptions, nextQuery);
});
};
/**
- * Returns the fully qualified project name.
- * Full name is in /projects/ form.
+ * Make a new request object from the provided arguments and wrap the callback
+ * to intercept non-successful responses.
+ *
+ * @private
+ *
+ * @param {string} method - Action.
+ * @param {string} path - Request path.
+ * @param {*} query - Request query object.
+ * @param {*} body - Request body contents.
+ * @param {function} callback - The callback function.
*/
-Connection.prototype.fullProjectName_ = function() {
- return util.format('/projects/{projectId}', {
- projectId: this.id
- });
-};
-
-Connection.prototype.makeReq = function(method, path, q, body, callback) {
+PubSub.prototype.makeReq_ = function(method, path, q, body, callback) {
var reqOpts = {
method: method,
qs: q,
- uri: util.format('{base}/{path}', {
- base: PUBSUB_BASE_URL,
- path: path
- })
+ uri: PUBSUB_BASE_URL + '/' + path
};
if (body) {
reqOpts.json = body;
}
- this.conn.req(reqOpts, function(err, res, body) {
+ this.connection.req(reqOpts, function(err, res, body) {
util.handleResp(err, res, body, callback);
});
};
-/**
- * Exports Connection.
- */
-module.exports.Connection = Connection;
-
-/**
- * Exports Topic.
- */
-module.exports.Topic = Topic;
-
-/**
- * Exports Subscription.
- */
-module.exports.Subscription = Subscription;
+module.exports = PubSub;
diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js
new file mode 100644
index 00000000000..aedaa0eb552
--- /dev/null
+++ b/lib/pubsub/subscription.js
@@ -0,0 +1,329 @@
+/*!
+ * Copyright 2014 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*!
+ * @module pubsub/subscription
+ */
+
+'use strict';
+
+var events = require('events');
+var nodeutil = require('util');
+
+/**
+ * @type module:common/util
+ * @private
+ */
+var util = require('../common/util.js');
+
+/*! Developer Documentation
+ *
+ * @param {module:pubsub} pubsub - PubSub object.
+ * @param {object} options - Configuration object.
+ * @param {boolean} options.autoAck - Automatically acknowledge the message
+ * once it's pulled. (default: false)
+ * @param {number} options.interval - Interval in milliseconds to check for new
+ * messages. (default: 10)
+ * @param {string} options.name - Name of the subscription.
+ */
+/**
+ * A Subscription object will give you access to your Google Cloud Pub/Sub
+ * subscription.
+ *
+ * Subscriptions are sometimes retrieved when using various methods:
+ *
+ * - {@linkcode module:pubsub#getSubscriptions}
+ * - {@linkcode module:pubsub/topic#getSubscriptions}
+ * - {@linkcode module:pubsub/topic#subscribe}
+ *
+ * Subscription objects may be created directly with:
+ *
+ * - {@linkcode module:pubsub/topic#subscription}
+ *
+ * All Subscription objects are instances of an
+ * [EventEmitter]{@link http://nodejs.org/api/events.html}. The subscription
+ * will pull for messages automatically as long as there is at least one
+ * listener assigned for the `message` event.
+ *
+ * @alias pubsub/subscription
+ * @constructor
+ *
+ * @example
+ * //-
+ * // From {@linkcode module:pubsub#getSubscriptions}:
+ * //-
+ * pubsub.getSubscriptions(function(err, subscriptions) {
+ * // `subscriptions` is an array of Subscription objects.
+ * });
+ *
+ * //-
+ * // From {@linkcode module:pubsub/topic#getSubscriptions}:
+ * //-
+ * var topic = pubsub.topic('my-existing-topic');
+ * topic.getSubscriptions(function(err, subscriptions) {
+ * // `subscriptions` is an array of Subscription objects.
+ * });
+ *
+ * //-
+ * // From {@linkcode module:pubsub/topic#subscribe}:
+ * //-
+ * var topic = pubsub.topic('my-existing-topic');
+ * topic.subscribe('new-subscription', function(err, subscription) {
+ * // `subscription` is a Subscription object.
+ * });
+ *
+ * //-
+ * // From {@linkcode module:pubsub/topic#subscription}:
+ * //-
+ * var topic = pubsub.topic('my-existing-topic');
+ * var subscription = topic.subscription('my-existing-subscription');
+ * // `subscription` is a Subscription object.
+ *
+ * //-
+ * // Once you have obtained a subscription object, you may begin to register
+ * // listeners. This will automatically trigger pulling for messages.
+ * //-
+ * // Register an error handler.
+ * subscription.on('error', function(err) {});
+ *
+ * // Register a listener for `message` events.
+ * function onMessage(message) {
+ * // Called every time a message is received.
+ * // message.id = ID used to acknowledge its receival.
+ * // message.data = Contents of the message.
+ * }
+ * subscription.on('message', onMessage);
+ *
+ * // Remove the listener from receiving `message` events.
+ * subscription.removeListener('message', onMessage);
+ */
+function Subscription(pubsub, options) {
+ events.EventEmitter.call(this);
+
+ this.name = Subscription.formatName_(pubsub.projectId, options.name);
+ this.makeReq_ = pubsub.makeReq_.bind(pubsub);
+
+ this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
+ this.closed = false;
+ this.interval = util.is(options.interval, 'number') ? options.interval : 10;
+
+ this.listenForEvents_();
+}
+
+nodeutil.inherits(Subscription, events.EventEmitter);
+
+/**
+ * Format the name of a subscription. A subscription's full name is in the
+ * format of /subscription/{projectId}/{name}.
+ *
+ * @private
+ */
+Subscription.formatName_ = function(projectId, name) {
+ // Simple check if the name is already formatted.
+ if (name.indexOf('/') > -1) {
+ return name;
+ }
+ return '/subscriptions/' + projectId + '/' + name;
+};
+
+/**
+ * Simplify a message from an API response to have two properties, `id` and
+ * `data`. `data` is always converted to a string.
+ *
+ * @private
+ */
+Subscription.formatMessage_ = function(msg) {
+ var message = {
+ id: msg.ackId
+ };
+ if (msg.pubsubEvent && msg.pubsubEvent.message) {
+ message.data =
+ new Buffer(msg.pubsubEvent.message.data, 'base64').toString('utf-8');
+ try {
+ message.data = JSON.parse(message.data);
+ } catch(e) {}
+ }
+ return message;
+};
+
+/**
+ * Begin listening for events on the subscription. This method keeps track of
+ * how many message listeners are assigned, and then removed, making sure
+ * polling is handled automatically.
+ *
+ * As long as there is one active message listener, the connection is open. As
+ * soon as there are no more message listeners, the connection is closed.
+ *
+ * @private
+ *
+ * @example
+ * this.listenForEvents_();
+ */
+Subscription.prototype.listenForEvents_ = function() {
+ var that = this;
+ var messageListeners = 0;
+
+ this.on('newListener', function(event) {
+ if (event === 'message') {
+ messageListeners++;
+ if (that.closed) {
+ that.closed = false;
+ }
+ that.startPulling_();
+ }
+ });
+
+ this.on('removeListener', function(event) {
+ if (event === 'message' && --messageListeners === 0) {
+ that.closed = true;
+ }
+ });
+};
+
+/**
+ * Poll the backend for new messages. This runs a loop to ping the API at the
+ * provided interval from the subscription's instantiation. If one wasn't
+ * provided, the default value is 10 milliseconds.
+ *
+ * If messages are received, they are emitted on the `message` event.
+ *
+ * Note: This method is automatically called once a message event handler is
+ * assigned to the description.
+ *
+ * To stop pulling, see {@linkcode module:pubsub/subscription#close}.
+ *
+ * @private
+ *
+ * @example
+ * subscription.startPulling_();
+ */
+Subscription.prototype.startPulling_ = function() {
+ var that = this;
+ if (this.closed) {
+ return;
+ }
+ this.pull({
+ returnImmediately: false
+ }, function(err, message) {
+ if (err) {
+ that.emit('error', err);
+ }
+ if (message) {
+ that.emit('message', message);
+ }
+ setTimeout(that.startPulling_.bind(that), that.interval);
+ });
+};
+
+/**
+ * Acknowledge to the backend that the message was retrieved. You must provide
+ * either a single ID, or an array of IDs.
+ *
+ * @throws {Error} If at least one id is not provided.
+ *
+ * @param {string|string[]} ids - An ID or array of message IDs.
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * subscription.ack('ePHEESyhuE8e...', function(err) {});
+ */
+Subscription.prototype.ack = function(ids, callback) {
+ if (!ids || ids.length === 0) {
+ throw new Error(
+ 'At least one ID must be specified before it can be acknowledged');
+ }
+ ids = util.arrayize(ids);
+ var body = {
+ subscription: this.name,
+ ackId: ids
+ };
+ this.makeReq_('POST', 'subscriptions/acknowledge', null, body, callback);
+};
+
+/**
+ * Delete the subscription. Pull requests from the current subscription will be
+ * errored once unsubscription is complete.
+ *
+ * @param {function=} callback - The callback function.
+ *
+ * @example
+ * subscription.delete(function(err) {});
+ */
+Subscription.prototype.delete = function(callback) {
+ callback = callback || util.noop;
+ this.makeReq_(
+ 'DELETE', 'subscriptions/' + this.name, null, true, function(err) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ this.closed = true;
+ this.removeAllListeners();
+ callback(null);
+ }.bind(this));
+};
+
+/**
+ * Pull messages from the subscribed topic. If messages were found, your
+ * callback is executed with the message object.
+ *
+ * Note that messages are pulled automatically once you register your first
+ * event listener to the subscription, thus the call to `pull` is handled for
+ * you. If you don't want to start pulling, simply don't register a
+ * `subscription.on('message', function() {})` event handler.
+ *
+ * @param {object=} options - Configuration object.
+ * @param {boolean=} options.returnImmediately - If set, the system will respond
+ * immediately. Otherwise, wait until new messages are available. Returns if
+ * timeout is reached.
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * subscription.pull(function(err, message) {
+ * // message.id = ID used to acknowledge its receival.
+ * // message.data = Contents of the message.
+ * });
+ */
+Subscription.prototype.pull = function(options, callback) {
+ var that = this;
+ // TODO(jbd): Should not be racing with other pull.
+ if (!callback) {
+ callback = options;
+ options = {};
+ }
+ var body = {
+ subscription: this.name,
+ returnImmediately: !!options.returnImmediately
+ };
+ this.makeReq_(
+ 'POST', 'subscriptions/pull', null, body, function(err, message) {
+ // TODO(jbd): Fix API to return a list of messages.
+ if (err) {
+ callback(err);
+ return;
+ }
+ message = Subscription.formatMessage_(message);
+ if (that.autoAck) {
+ that.ack(message.id, function(err) {
+ callback(err, message);
+ });
+ } else {
+ callback(null, message);
+ }
+ });
+};
+
+module.exports = Subscription;
diff --git a/lib/pubsub/topic.js b/lib/pubsub/topic.js
new file mode 100644
index 00000000000..53ced5ac507
--- /dev/null
+++ b/lib/pubsub/topic.js
@@ -0,0 +1,270 @@
+/*!
+ * Copyright 2014 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*!
+ * @module pubsub/topic
+ */
+
+'use strict';
+
+/**
+ * @type module:common/util
+ * @private
+ */
+var util = require('../common/util.js');
+
+/**
+ * @type module:pubsub/subscription
+ * @private
+ */
+var Subscription = require('./subscription.js');
+
+/*! Developer Documentation
+ *
+ * @param {module:pubsub} pubsub - PubSub object.
+ * @param {object} options - Configuration object.
+ * @param {string} options.name - Name of the topic.
+ */
+/**
+ * A Topic object allows you to interact with a Google Cloud Pub/Sub topic. To
+ * get this object, you will use the methods on the `pubsub` object,
+ * {@linkcode module:pubsub#topic} and {@linkcode module:pubsub#createTopic}.
+ *
+ * @constructor
+ * @alias module:pubsub/topic
+ *
+ * @example
+ * // From pubsub.topic:
+ * var topic = pubsub.topic('my-existing-topic');
+ *
+ * // From pubsub.createTopic:
+ * pubsub.createTopic('my-new-topic', function(err, topic) {
+ * // `topic` is a Topic object.
+ * });
+ */
+function Topic(pubsub, options) {
+ this.makeReq_ = pubsub.makeReq_.bind(pubsub);
+ this.name = Topic.formatName_(pubsub.projectId, options.name);
+ this.projectId = pubsub.projectId;
+ this.pubsub = pubsub;
+}
+
+/**
+ * Format the name of a topic. A Topic's full name is in the format of
+ * /topics/{projectId}/{name}.
+ *
+ * @private
+ *
+ * @return {string}
+ */
+Topic.formatName_ = function(projectId, name) {
+ // Simple check if the name is already formatted.
+ if (name.indexOf('/') > -1) {
+ return name;
+ }
+ return '/topics/' + projectId + '/' + name;
+};
+
+/**
+ * Publish the provided message. A message can be of any type.
+ *
+ * @throws {Error} If no message is provided.
+ *
+ * @param {*} message - The message to publish.
+ * @param {function=} callback - The callback function.
+ *
+ * @example
+ * topic.publish('New message!', function(err) {});
+ *
+ * topic.publish({
+ * user_id: 3,
+ * name: 'Stephen',
+ * message: 'Hello from me!'
+ * }, function(err) {});
+ */
+Topic.prototype.publish = function(message, callback) {
+ if (!message) {
+ throw new Error('Cannot publish an empty message.');
+ }
+ callback = callback || util.noop;
+ if (!util.is(message, 'string') && !util.is(message, 'buffer')) {
+ message = JSON.stringify(message);
+ }
+ this.publishRaw({
+ data: new Buffer(message).toString('base64')
+ }, callback);
+};
+
+/**
+ * Publish a raw message.
+ *
+ * @throws {Error} If no message is provided.
+ *
+ * @param {object} message - Raw message to publish.
+ * @param {array=} message.label - List of labels for the message.
+ * @param {*} message.data - The contents of the message.
+ * @param {function=} callback - The callback function.
+ *
+ * @example
+ * topic.publishRaw({
+ * data: new Buffer('New message!').toString('base64')
+ * }, function(err) {});
+ */
+Topic.prototype.publishRaw = function(message, callback) {
+ if (!message) {
+ throw new Error('Cannot publish an empty message.');
+ }
+ callback = callback || util.noop;
+ if (!util.is(message.data, 'string') && !util.is(message.data, 'buffer')) {
+ message.data = new Buffer(JSON.stringify(message.data)).toString('base64');
+ }
+ message.topic = this.name;
+ this.makeReq_('POST', 'topics/publish', null, message, callback);
+};
+
+/**
+ * Delete the topic.
+ *
+ * @param {function=} callback - The callback function.
+ *
+ * @example
+ * topic.delete(function(err) {});
+ */
+Topic.prototype.delete = function(callback) {
+ callback = callback || util.noop;
+ this.makeReq_('DELETE', 'topics/' + this.name, null, true, callback);
+};
+
+/**
+ * Get a list of the subscriptions registered to this topic. You may optionally
+ * provide a query object as the first argument to customize the response.
+ *
+ * Your provided callback will either be invoked with an error object, if an API
+ * error occurred, or an array of {@linkcode module:pubsub/subscription}
+ * objects.
+ *
+ * @param {object=} query - Query object.
+ * @param {string=} query.pageToken - Page token.
+ * @param {number=} query.maxResults - Maximum number of results to return.
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * // Get all subscriptions.
+ * topic.getSubscriptions(function(err, subscriptions, nextQuery) {
+ * // If `nextQuery` is non-null, there may be more results to fetch. To do
+ * // so, run `topic.getSubscriptions(nextQuery, callback);`.
+ * });
+ *
+ * // Customize the query.
+ * topic.getSubscriptions({
+ * maxResults: 3
+ * }, function(err, subscriptions, nextQuery) {});
+ */
+Topic.prototype.getSubscriptions = function(query, callback) {
+ query.query = 'pubsub.googleapis.com/topic in (' + this.name + ')';
+ this.pubsub.getSubscriptions(query, callback);
+};
+
+/**
+ * Create a subscription to this topic. You may optionally provide an object to
+ * customize the subscription.
+ *
+ * Your provided callback will either be invoked with an error object, if an API
+ * error occurred, or a {@linkcode module:pubsub/subscription} object.
+ *
+ * @throws {Error} If a name is not provided.
+ *
+ * @param {string} name - The name of the subscription.
+ * @param {object=} options - Configuration object.
+ * @param {number=} options.ackDeadlineSeconds - The maximum time after
+ * receiving a message that you must ack a message before it is redelivered.
+ * @param {boolean=} options.autoAck - Automatically acknowledge the message
+ * once it's pulled. (default: false)
+ * @param {number=} options.interval - Interval in milliseconds to check for new
+ * messages. (default: 10)
+ * @param {function} callback - The callback function.
+ *
+ * @example
+ * // Without specifying any options.
+ * topic.subscribe('new-subscription', function(err, subscription) {});
+ *
+ * // With options.
+ * topic.subscribe('new-subscription', {
+ * ackDeadlineSeconds: 90,
+ * autoAck: true,
+ * interval: 30
+ * }, function(err, subscription) {});
+ */
+Topic.prototype.subscribe = function(name, options, callback) {
+ if (!name) {
+ throw new Error('A name is required for a new subscription.');
+ }
+ if (!callback) {
+ callback = options;
+ options = {};
+ }
+ var body = {
+ topic: this.name,
+ name: Subscription.formatName_(this.projectId, name)
+ };
+ if (options.ackDeadlineSeconds) {
+ body.ackDeadlineSeconds = options.ackDeadlineSeconds;
+ }
+ this.makeReq_('POST', 'subscriptions', null, body, function(err) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ callback(null, this.subscription(name, options));
+ }.bind(this));
+};
+
+/**
+ * Create a Subscription object in reference to an existing subscription. This
+ * command by itself will not run any API requests. You will receive a
+ * {@linkcode module:pubsub/subscription} object, which will allow you to
+ * interact with your subscription.
+ *
+ * @throws {Error} If a name is not provided.
+ *
+ * @param {string} name - Name of the subscription.
+ * @param {object=} options - Configuration object.
+ * @param {boolean=} options.autoAck - Automatically acknowledge the message
+ * once it's pulled.
+ * @param {number=} options.interval - Interval in milliseconds to check for new
+ * messages.
+ * @return {module:pubsub/subscription}
+ *
+ * @example
+ * var subscription = topic.subscription('my-existing-subscription');
+ *
+ * // Register a listener for `message` events.
+ * subscription.on('message', function(message) {
+ * // Called every time a message is received.
+ * // message.id = ID used to acknowledge its receival.
+ * // message.data = Contents of the message.
+ * });
+ */
+Topic.prototype.subscription = function(name, options) {
+ if (!name) {
+ throw new Error('The name of a subscription is required.');
+ }
+ options = options || {};
+ options.name = name;
+ return new Subscription(this.pubsub, options);
+};
+
+module.exports = Topic;
diff --git a/package.json b/package.json
index 9534ae9d758..ede031b2c7c 100644
--- a/package.json
+++ b/package.json
@@ -63,12 +63,12 @@
"tmp": "0.0.24"
},
"scripts": {
- "docs": "dox < lib/index.js > docs/json/master/index.json & dox < lib/datastore/dataset.js > docs/json/master/datastore/dataset.json & dox < lib/datastore/transaction.js > docs/json/master/datastore/transaction.json & dox < lib/datastore/index.js > docs/json/master/datastore/index.json & dox < lib/datastore/request.js > docs/json/master/datastore/request.json & dox < lib/datastore/query.js > docs/json/master/datastore/query.json & dox < lib/storage/index.js > docs/json/master/storage/index.json",
+ "docs": "./scripts/docs.sh",
"lint": "jshint lib/ regression/ test/",
"test": "mocha --recursive --reporter spec",
- "regression-test": "mocha regression/datastore.js regression/storage.js --reporter spec --timeout 15000",
- "cover": "istanbul cover -x 'regression/* lib/pubsub/*' _mocha -- --timeout 15000 test/* regression/datastore.js regression/storage.js",
- "coveralls": "istanbul cover -x 'regression/* lib/pubsub/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/datastore.js regression/storage.js -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage"
+ "regression-test": "mocha regression/datastore.js regression/pubsub.js regression/storage.js --reporter spec --timeout 15000",
+ "cover": "istanbul cover -x 'regression/*' _mocha -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js",
+ "coveralls": "istanbul cover -x 'regression/*' _mocha --report lcovonly -- --timeout 15000 test/* regression/datastore.js regression/pubsub.js regression/storage.js -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage"
},
"license": "Apache 2"
}
diff --git a/regression/pubsub.js b/regression/pubsub.js
index d0dea40566d..c230fe97bb5 100644
--- a/regression/pubsub.js
+++ b/regression/pubsub.js
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-/*global describe, it, before */
+/*global describe, it, before, after */
'use strict';
@@ -22,148 +22,168 @@ var assert = require('assert');
var async = require('async');
var env = require('./env.js');
-var gcloud = require('../lib');
-
-var topicNames = ['topic1', 'topic2', 'topic3'];
-var subscriptions = [
- {
- name: 'sub1',
- ackDeadlineSeconds: 30
- },
- {
- name: 'sub2',
- ackDeadlineSeconds: 60
- }
-];
+var gcloud = require('../lib')(env);
+
+var Subscription = require('../lib/pubsub/subscription.js');
-var conn = new gcloud.pubsub.Connection(env);
+var pubsub = gcloud.pubsub();
describe('pubsub', function() {
+ var topicNames = ['topic1', 'topic2', 'topic3'];
- before(function(done) {
+ function deleteAllTopics(callback) {
// TODO: Handle pagination.
- var createFn = function(name, callback) {
- conn.createTopic(name, callback);
- };
- conn.listTopics(function(err, topics) {
+ pubsub.getTopics(function(err, topics) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ async.parallel(topics.map(function(topic) {
+ return topic.delete.bind(topic);
+ }), callback);
+ });
+ }
+
+ before(function(done) {
+ deleteAllTopics(function(err) {
assert.ifError(err);
- var fns = topics.map(function(t) {
- return function(cb) {
- t.del(cb);
- };
- });
- async.parallel(fns, function(err) {
- assert.ifError(err);
- async.map(topicNames, createFn, done);
- });
+ // Create new topics.
+ async.map(topicNames, pubsub.createTopic.bind(pubsub), done);
});
});
+ after(deleteAllTopics);
+
describe('Topic', function() {
it('should be listed', function(done) {
- conn.listTopics(function(err, topics) {
- assert(topics.length, 3);
- done(err);
+ pubsub.getTopics(function(err, topics) {
+ assert.ifError(err);
+ assert(topics.length, topicNames.length);
+ done();
});
});
it('should return a nextQuery if there are more results', function(done) {
- conn.listTopics({ maxResults: 2 }, function(err, topics, next) {
- assert(topics.length, 2);
- assert(next.maxResults, 2);
+ pubsub.getTopics({
+ maxResults: topicNames.length - 1
+ }, function(err, topics, next) {
+ assert.ifError(err);
+ assert(topics.length, topicNames.length - 1);
+ assert(next.maxResults, topicNames.length - 1);
assert(!!next.pageToken, true);
- done(err);
+ done();
});
});
it('should be created', function(done) {
- conn.createTopic('topic-new', done);
- });
-
- it('should be gettable', function(done) {
- conn.getTopic('topic1', done);
+ pubsub.createTopic('new-topic-name', done);
});
it('should publish a message', function(done) {
- conn.getTopic('topic1', function(err, topic) {
- topic.publish('message from me', done);
- });
+ pubsub.topic(topicNames[0])
+ .publish('message from me', done);
});
it('should be deleted', function(done) {
- conn.getTopic('topic3', function(err, topic) {
- topic.del(done);
- });
+ pubsub.topic(topicNames[0])
+ .delete(done);
});
});
describe('Subscription', function() {
+ var TOPIC_NAME = 'test-topic';
+ var subscriptions = [
+ {
+ name: 'sub1',
+ options: { ackDeadlineSeconds: 30 }
+ },
+ {
+ name: 'sub2',
+ options: { ackDeadlineSeconds: 60 }
+ }
+ ];
+ var topic;
+
+ function deleteAllTopics(callback) {
+ pubsub.getTopics(function(err, topics) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ async.parallel(topics.map(function(topic) {
+ return topic.delete.bind(topic);
+ }), callback);
+ });
+ }
+
+ function deleteAllSubscriptions(callback) {
+ pubsub.getSubscriptions(function(err, subs) {
+ if (err) {
+ callback(err);
+ return;
+ }
+ async.parallel(subs.map(function(sub) {
+ return sub.delete.bind(sub);
+ }), callback);
+ });
+ }
+
before(function(done) {
- var createFn = function(item, callback) {
- conn.createSubscription({
- name: item.name,
- topic: 'topic1',
- ackDeadlineSeconds: item.ackDeadlineSeconds
- }, callback);
- };
- conn.listSubscriptions(function(err, subs) {
+ async.parallel([deleteAllTopics, deleteAllSubscriptions], function(err) {
assert.ifError(err);
- var fns = subs.map(function(sub) {
- return function(cb) {
- sub.del(cb);
- };
- });
- async.series(fns, function(err) {
+ // Create a new test topic.
+ pubsub.createTopic(TOPIC_NAME, function(err, newTopic) {
assert.ifError(err);
- async.map(subscriptions, createFn, done);
+ topic = newTopic;
+ // Create subscriptions.
+ async.parallel(subscriptions.map(function(sub) {
+ return topic.subscribe.bind(topic, sub.name, sub.options);
+ }), done);
});
});
});
- it('should be listed', function(done) {
- conn.listSubscriptions(function(err, subs) {
- assert.strictEqual(subs.length, 2);
- done(err);
- });
+ after(function(done) {
+ topic.delete(done);
});
- it('should be gettable', function(done) {
- conn.getSubscription('sub1', function(err, sub) {
+ it('should list all subscriptions registered to the topic', function(done) {
+ topic.getSubscriptions(function(err, subs) {
assert.ifError(err);
- assert.strictEqual(sub.name, '/subscriptions/' + env.projectId +
- '/sub1');
+ assert(subs[0] instanceof Subscription);
+ assert.equal(subs.length, subscriptions.length);
done();
});
});
- it('should error while getting a non-existent subscription', function(done){
- conn.getSubscription('sub-nothing-is-here', function(err) {
- assert.strictEqual(err.code, 404);
+ it('should allow creation of a topic', function(done) {
+ topic.subscribe('new-subscription', function(err, sub) {
+ assert.ifError(err);
+ assert(sub instanceof Subscription);
done();
});
});
- it('should be created', function(done) {
- conn.createSubscription({
- topic: 'topic1',
- name: 'new-sub'
- }, done);
+ it('should error when using a non-existent subscription', function(done) {
+ var subscription = topic.subscription('non-existent-subscription');
+ subscription.pull(function(err) {
+ assert.equal(err.code, 404);
+ done();
+ });
});
it('should be able to pull and ack', function(done) {
- conn.getTopic('topic1', function(err, topic) {
- assert.ifError(err);
- topic.publish('hello', function(err) {
- assert.ifError(err);
- });
- });
- conn.getSubscription('sub1', function(err, sub) {
+ var subscription = topic.subscription(subscriptions[0].name);
+ subscription.pull({ returnImmediately: true }, function(err, msg) {
assert.ifError(err);
- sub.on('message', function(msg) {
- sub.ack(msg.ackId, done);
- });
- sub.pull({}, function() {});
+ subscription.ack(msg.id, done);
});
+ topic.publish('hello', assert.ifError);
+ topic.publish('hello', assert.ifError);
+ topic.publish('hello', assert.ifError);
+ topic.publish('hello', assert.ifError);
+ topic.publish('hello', assert.ifError);
+ topic.publish('hello', assert.ifError);
});
});
});
diff --git a/scripts/docs.sh b/scripts/docs.sh
new file mode 100755
index 00000000000..ac7617c0ab3
--- /dev/null
+++ b/scripts/docs.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+
+# Copyright 2014 Google Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+./node_modules/.bin/dox < lib/index.js > docs/json/master/index.json &
+
+./node_modules/.bin/dox < lib/datastore/dataset.js > docs/json/master/datastore/dataset.json &
+./node_modules/.bin/dox < lib/datastore/index.js > docs/json/master/datastore/index.json &
+./node_modules/.bin/dox < lib/datastore/query.js > docs/json/master/datastore/query.json &
+./node_modules/.bin/dox < lib/datastore/request.js > docs/json/master/datastore/request.json &
+./node_modules/.bin/dox < lib/datastore/transaction.js > docs/json/master/datastore/transaction.json &
+
+./node_modules/.bin/dox < lib/pubsub/index.js > docs/json/master/pubsub/index.json &
+./node_modules/.bin/dox < lib/pubsub/subscription.js > docs/json/master/pubsub/subscription.json &
+./node_modules/.bin/dox < lib/pubsub/topic.js > docs/json/master/pubsub/topic.json &
+
+./node_modules/.bin/dox < lib/storage/index.js > docs/json/master/storage/index.json
diff --git a/test/pubsub/index.js b/test/pubsub/index.js
index d249ed97379..da93518d77b 100644
--- a/test/pubsub/index.js
+++ b/test/pubsub/index.js
@@ -14,81 +14,174 @@
* limitations under the License.
*/
-/*global describe, it */
+/*global describe, it, beforeEach */
'use strict';
var assert = require('assert');
-var pubsub = require('../../lib/pubsub');
-
-describe('Subscription', function() {
- it('should ack messages if autoAck is set', function(done) {
- var sub = new pubsub.Subscription({}, 'sub1');
- sub.autoAck = true;
- sub.conn.makeReq = function(method, path, qs, body, callback) {
- if (path === 'subscriptions/pull') {
- callback(null, { ackId: 'ackd-id' });
- return;
- }
- if (path === 'subscriptions/acknowledge') {
- done();
- }
+var PubSub = require('../../lib/pubsub/index.js');
+var Subscription = require('../../lib/pubsub/subscription.js');
+var Topic = require('../../lib/pubsub/topic.js');
+
+describe('PubSub', function() {
+ var PROJECT_ID = 'test-project';
+ var pubsub;
+
+ beforeEach(function() {
+ pubsub = new PubSub({ projectId: PROJECT_ID });
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback();
};
- sub.pull({}, function() {});
});
- it('should be closed', function(done) {
- var sub = new pubsub.Subscription({}, 'sub1');
- sub.close();
- assert.strictEqual(sub.closed, true);
- done();
+ describe('getTopics', function() {
+ beforeEach(function() {
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(null, { topic: [{ name: 'fake-topic' }] });
+ };
+ });
+
+ it('should accept a query and a callback', function(done) {
+ pubsub.getTopics({}, done);
+ });
+
+ it('should accept just a callback', function(done) {
+ pubsub.getTopics(done);
+ });
+
+ it('should build a project-wide query', function() {
+ pubsub.makeReq_ = function(method, path, q) {
+ var query =
+ 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')';
+ assert.equal(method, 'GET');
+ assert.equal(path, 'topics');
+ assert.equal(q.query, query);
+ };
+ pubsub.getTopics(function() {});
+ });
+
+ it('should return Topic instances', function() {
+ pubsub.getTopics(function(err, topics) {
+ assert.ifError(err);
+ assert(topics[0] instanceof Topic);
+ });
+ });
+
+ it('should return a query if more results exist', function() {
+ var token = 'next-page-token';
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(null, { nextPageToken: token });
+ };
+ var query = { maxResults: 1 };
+ pubsub.getTopics(query, function(err, topics, nextQuery) {
+ assert.ifError(err);
+ assert.strictEqual(query.maxResults, nextQuery.maxResults);
+ assert.equal(query.pageToken, token);
+ });
+ });
+
+ it('should pass error if api returns an error', function() {
+ var error = new Error('Error');
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(error);
+ };
+ pubsub.getTopics(function(err) {
+ assert.equal(err, error);
+ });
+ });
});
- it('should pull messages', function(done) {
- var conn = new pubsub.Connection({
- projectId: 'test-project'
- });
- conn.makeReq = function(method, path, qs, body, callback) {
- if (path === 'subscriptions//subscriptions/test-project/sub1') {
- callback(null, {});
- return;
- }
- if (path === 'subscriptions/pull') {
- callback(null, { ackId: 123 });
- return;
- }
- };
- var sub = conn.subscribe('sub1', { autoAck: false });
- sub.once('message', function() {
- sub.close();
- done();
+ describe('createTopic', function() {
+ it('should create a topic', function() {
+ var topicName = 'new-topic-name';
+ pubsub.makeReq_ = function(method, path, q, body) {
+ assert.equal(method, 'POST');
+ assert.equal(path, 'topics');
+ assert.equal(body.name, '/topics/' + PROJECT_ID + '/' + topicName);
+ };
+ pubsub.createTopic(topicName, function() {});
+ });
+
+ it('should return a Topic object', function() {
+ pubsub.createTopic('new-topic', function(err, topic) {
+ assert.ifError(err);
+ assert(topic instanceof Topic);
+ });
});
});
- it('should pull and ack messages', function(done) {
- var conn = new pubsub.Connection({
- projectId: 'test-project'
- });
- conn.makeReq = function(method, path, qs, body, callback) {
- if (path === 'subscriptions//subscriptions/test-project/sub1') {
- callback(null, {});
- return;
- }
- if (path === 'subscriptions/pull') {
- setImmediate(function() {
- callback(null, { ackId: 123 });
- });
- return;
- }
- if (path === 'subscriptions/acknowledge') {
- callback(null, true);
- return;
- }
- };
- var sub = conn.subscribe('sub1', { autoAck: true });
- sub.once('message', function() {
- sub.close();
- done();
+ describe('topic', function() {
+ it('should throw if a name is not provided', function() {
+ assert.throws(function() {
+ pubsub.topic();
+ }, /name must be specified/);
});
+
+ it('should return a Topic object', function() {
+ assert(pubsub.topic('new-topic') instanceof Topic);
+ });
+ });
+
+ describe('getSubscriptions', function() {
+ beforeEach(function() {
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(null, { subscription: [{ name: 'fake-subscription' }] });
+ };
+ });
+
+ it('should accept a query and a callback', function(done) {
+ pubsub.getSubscriptions({}, done);
+ });
+
+ it('should accept just a callback', function(done) {
+ pubsub.getSubscriptions(done);
+ });
+
+ it('should build a project-wide query', function() {
+ pubsub.makeReq_ = function(method, path, q) {
+ var query =
+ 'cloud.googleapis.com/project in (/projects/' + PROJECT_ID + ')';
+ assert.equal(method, 'GET');
+ assert.equal(path, 'subscriptions');
+ assert.equal(q.query, query);
+ };
+ pubsub.getSubscriptions(function() {});
+ });
+
+ it('should return Subscription instances', function() {
+ pubsub.getSubscriptions(function(err, subscriptions) {
+ assert.ifError(err);
+ assert(subscriptions[0] instanceof Subscription);
+ });
+ });
+
+ it('should return a query if more results exist', function() {
+ var token = 'next-page-token';
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(null, { nextPageToken: token });
+ };
+ var query = { maxResults: 1 };
+ pubsub.getSubscriptions(query, function(err, subscriptions, nextQuery) {
+ assert.ifError(err);
+ assert.strictEqual(query.maxResults, nextQuery.maxResults);
+ assert.equal(query.pageToken, token);
+ });
+ });
+
+ it('should pass error if api returns an error', function() {
+ var error = new Error('Error');
+ pubsub.makeReq_ = function(method, path, q, body, callback) {
+ callback(error);
+ };
+ pubsub.getSubscriptions(function(err) {
+ assert.equal(err, error);
+ });
+ });
+ });
+
+ it('should pass network requests to the connection object', function(done) {
+ var pubsub = new PubSub();
+ pubsub.connection.req = done.bind(null, null);
+ pubsub.makeReq_();
});
});
diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js
new file mode 100644
index 00000000000..2d1ed3206e7
--- /dev/null
+++ b/test/pubsub/subscription.js
@@ -0,0 +1,430 @@
+/**
+ * Copyright 2014 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*global describe, it, beforeEach, afterEach */
+
+'use strict';
+
+var assert = require('assert');
+var util = require('../../lib/common/util.js');
+var Subscription = require('../../lib/pubsub/subscription.js');
+
+describe('Subscription', function() {
+ var PROJECT_ID = 'test-project';
+ var SUB_NAME = 'test-subscription';
+ var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME;
+ var pubsubMock = {
+ projectId: PROJECT_ID,
+ makeReq_: util.noop
+ };
+ var message = 'howdy';
+ var messageBuffer = new Buffer(message).toString('base64');
+ var messageObj = {
+ ackId: 3,
+ pubsubEvent: {
+ message: {
+ data: messageBuffer
+ }
+ }
+ };
+ var expectedMessage = { id: 3, data: message };
+ var subscription;
+
+ beforeEach(function() {
+ subscription = new Subscription(pubsubMock, { name: SUB_NAME });
+ });
+
+ describe('initialization', function() {
+ it('should format name', function(done) {
+ var formatName_ = Subscription.formatName_;
+ Subscription.formatName_ = function() {
+ Subscription.formatName_ = formatName_;
+ done();
+ };
+ new Subscription(pubsubMock, { name: SUB_NAME });
+ });
+
+ it('should honor configuration settings', function() {
+ var CONFIG = {
+ name: SUB_NAME,
+ autoAck: true,
+ interval: 100
+ };
+ var sub = new Subscription(pubsubMock, CONFIG);
+ assert.strictEqual(sub.autoAck, CONFIG.autoAck);
+ assert.strictEqual(sub.interval, CONFIG.interval);
+ });
+
+ it('should not be closed', function() {
+ assert.strictEqual(subscription.closed, false);
+ });
+
+ it('should default autoAck to false if not specified', function() {
+ var sub = new Subscription(pubsubMock, { name: SUB_NAME });
+ assert.strictEqual(sub.autoAck, false);
+ });
+
+ it('should set default interval if one is not specified', function() {
+ var sub = new Subscription(pubsubMock, { name: SUB_NAME });
+ assert.equal(sub.interval, 10);
+ });
+ });
+
+ describe('formatName_', function() {
+ it('should format name', function() {
+ var formattedName = Subscription.formatName_(PROJECT_ID, SUB_NAME);
+ assert.equal(formattedName, SUB_FULL_NAME);
+ });
+
+ it('should format name when given a complete name', function() {
+ var formattedName = Subscription.formatName_(PROJECT_ID, SUB_FULL_NAME);
+ assert.equal(formattedName, SUB_FULL_NAME);
+ });
+ });
+
+ describe('listenForEvents_', function() {
+ afterEach(function() {
+ subscription.removeAllListeners();
+ });
+
+ it('should start pulling once a message listener is bound', function(done) {
+ subscription.startPulling_ = function() {
+ done();
+ };
+ subscription.on('message', util.noop);
+ });
+
+ it('should close when no more message listeners are bound', function() {
+ subscription.startPulling_ = util.noop;
+ subscription.on('message', util.noop);
+ subscription.on('message', util.noop);
+ // 2 listeners: sub should be open.
+ assert.strictEqual(subscription.closed, false);
+ subscription.removeListener('message', util.noop);
+ // 1 listener: sub should be open.
+ assert.strictEqual(subscription.closed, false);
+ subscription.removeListener('message', util.noop);
+ // 0 listeners: sub should be closed.
+ assert.strictEqual(subscription.closed, true);
+ });
+ });
+
+ describe('ack', function() {
+ it('should throw if no IDs are provided', function() {
+ assert.throws(function() {
+ subscription.ack();
+ }, /At least one ID/);
+ assert.throws(function() {
+ subscription.ack([]);
+ }, /At least one ID/);
+ });
+
+ it('should accept a single id', function() {
+ assert.doesNotThrow(function() {
+ subscription.ack(1, util.noop);
+ });
+ });
+
+ it('should accept an array of ids', function() {
+ assert.doesNotThrow(function() {
+ subscription.ack([1], util.noop);
+ });
+ });
+
+ it('should make an array out of ids', function(done) {
+ var ID = 1;
+ subscription.makeReq_ = function(method, path, qs, body) {
+ assert.equal(body.subscription, SUB_FULL_NAME);
+ assert.deepEqual(body.ackId, [ID]);
+ done();
+ };
+ subscription.ack(ID, assert.ifError);
+ });
+
+ it('should make correct api request', function(done) {
+ var IDS = [1, 2, 3];
+ subscription.makeReq_ = function(method, path, qs, body) {
+ assert.equal(body.subscription, SUB_FULL_NAME);
+ assert.deepEqual(body.ackId, IDS);
+ done();
+ };
+ subscription.ack(IDS, assert.ifError);
+ });
+
+ it('should pass callback to request', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ subscription.ack(1, done);
+ });
+ });
+
+ describe('pull', function() {
+ beforeEach(function() {
+ subscription.ack = util.noop;
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback(null, messageObj);
+ };
+ });
+
+ it('should make correct api request', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body) {
+ assert.equal(method, 'POST');
+ assert.equal(path, 'subscriptions/pull');
+ assert.equal(body.subscription, SUB_FULL_NAME);
+ assert.strictEqual(body.returnImmediately, false);
+ done();
+ };
+ subscription.pull({}, assert.ifError);
+ });
+
+ it('should not require configuration options', function(done) {
+ subscription.pull(done);
+ });
+
+ it('should default returnImmediately to false', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body) {
+ assert.strictEqual(body.returnImmediately, false);
+ done();
+ };
+ subscription.pull({}, assert.ifError);
+ });
+
+ it('should honor options', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body) {
+ assert.strictEqual(body.returnImmediately, true);
+ done();
+ };
+ subscription.pull({ returnImmediately: true }, assert.ifError);
+ });
+
+ it('should pass error to callback', function(done) {
+ var error = new Error('Error.');
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback(error);
+ };
+ subscription.pull(function(err) {
+ assert.equal(err, error);
+ done();
+ });
+ });
+
+ describe('autoAck false', function() {
+ beforeEach(function() {
+ subscription.autoAck = false;
+ });
+
+ it('should not ack', function() {
+ subscription.ack = function() {
+ throw new Error('Should not have acked.');
+ };
+ subscription.pull({}, assert.ifError);
+ });
+
+ it('should execute callback with message', function(done) {
+ subscription.pull({}, function(err, msg) {
+ assert.deepEqual(msg, expectedMessage);
+ done();
+ });
+ });
+ });
+
+ describe('autoAck true', function() {
+ beforeEach(function() {
+ subscription.autoAck = true;
+ subscription.ack = function(id, callback) {
+ callback();
+ };
+ });
+
+ it('should ack', function(done) {
+ subscription.ack = function() {
+ done();
+ };
+ subscription.pull({}, assert.ifError);
+ });
+
+ it('should pass id to ack', function(done) {
+ subscription.ack = function(id) {
+ assert.equal(id, expectedMessage.id);
+ done();
+ };
+ subscription.pull({}, assert.ifError);
+ });
+
+ it('should pass callback to ack', function(done) {
+ subscription.pull({}, done);
+ });
+
+ it('should invoke callback with error from ack', function(done) {
+ var error = new Error('Error.');
+ subscription.ack = function(id, callback) {
+ callback(error);
+ };
+ subscription.pull({}, function(err) {
+ assert.equal(err, error);
+ done();
+ });
+ });
+
+ it('should execute callback', function(done) {
+ subscription.pull({}, done);
+ });
+ });
+ });
+
+ describe('startPulling_', function() {
+ beforeEach(function() {
+ subscription.pull = util.noop;
+ });
+
+ it('should pull at specified interval', function(done) {
+ var INTERVAL = 5;
+ subscription.pull = function(options, callback) {
+ // After pull is called once, overwrite with `done`.
+ // This is to override the function passed to `setTimeout`, so we are
+ // sure it's the same pull function when we execute it.
+ subscription.pull = function() {
+ done();
+ };
+ callback();
+ };
+ var setTimeout = global.setTimeout;
+ global.setTimeout = function(fn, interval) {
+ global.setTimeout = setTimeout;
+ assert.equal(interval, INTERVAL);
+ // This should execute the `done` function from when we overrided it
+ // above.
+ fn();
+ };
+ subscription.interval = INTERVAL;
+ subscription.startPulling_();
+ });
+
+ it('should stop pulling if subscription is closed', function() {
+ var pulledCount = 0;
+ subscription.pull = function() {
+ if (++pulledCount === 3) {
+ subscription.pull = function() {
+ throw Error('Should have stopped pulling.');
+ };
+ subscription.close();
+ }
+ };
+ subscription.startPulling_();
+ });
+
+ it('should set returnImmediately to false when pulling', function(done) {
+ subscription.pull = function(options) {
+ assert.strictEqual(options.returnImmediately, false);
+ done();
+ };
+ subscription.startPulling_();
+ });
+
+ it('should emit an error event if one is encountered', function(done) {
+ var error = new Error('Error.');
+ subscription.pull = function(options, callback) {
+ subscription.pull = function() {};
+ setImmediate(function() {
+ callback(error);
+ });
+ };
+ subscription
+ .once('error', function(err) {
+ assert.equal(err, error);
+ done();
+ })
+ .startPulling_();
+ });
+
+ it('should emit a message event', function(done) {
+ subscription.pull = function(options, callback) {
+ callback(null, { hi: 'there' });
+ };
+ subscription
+ .once('message', function(msg) {
+ assert.deepEqual(msg, { hi: 'there' });
+ done();
+ });
+ });
+ });
+
+ describe('delete', function() {
+ it('should delete a subscription', function(done) {
+ subscription.makeReq_ = function(method, path) {
+ assert.equal(method, 'DELETE');
+ assert.equal(path, 'subscriptions/' + subscription.name);
+ done();
+ };
+ subscription.delete();
+ });
+
+ it('should close a subscription once deleted', function() {
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ subscription.closed = false;
+ subscription.delete();
+ assert.strictEqual(subscription.closed, true);
+ });
+
+ it('should remove all listeners', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ subscription.removeAllListeners = function() {
+ done();
+ };
+ subscription.delete();
+ });
+
+ it('should execute callback when deleted', function(done) {
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ subscription.delete(done);
+ });
+
+ it('should execute callback with an api error', function(done) {
+ var error = new Error('Error.');
+ subscription.makeReq_ = function(method, path, qs, body, callback) {
+ callback(error);
+ };
+ subscription.delete(function(err) {
+ assert.equal(err, error);
+ done();
+ });
+ });
+ });
+
+ describe('formatMessage_', function() {
+ it('should decode stringified JSON to object', function() {
+ var obj = { hi: 'there' };
+ var stringified = new Buffer(JSON.stringify(obj)).toString('base64');
+ var msg = Subscription.formatMessage_({
+ ackId: 3,
+ pubsubEvent: { message: { data: stringified } }
+ });
+ assert.deepEqual(msg, { id: 3, data: obj });
+ });
+
+ it('should decode buffer to string', function() {
+ var msg = Subscription.formatMessage_(messageObj);
+ assert.deepEqual(msg, expectedMessage);
+ });
+ });
+});
diff --git a/test/pubsub/topic.js b/test/pubsub/topic.js
new file mode 100644
index 00000000000..800076b0a52
--- /dev/null
+++ b/test/pubsub/topic.js
@@ -0,0 +1,341 @@
+/**
+ * Copyright 2014 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*global describe, it, beforeEach, afterEach */
+
+'use strict';
+
+var assert = require('assert');
+var util = require('../../lib/common/util.js');
+
+var SubscriptionCached = require('../../lib/pubsub/subscription.js');
+var formatName_Cached = SubscriptionCached.formatName_;
+var SubscriptionOverride;
+var formatName_Override;
+
+function Subscription(a, b) {
+ var OverrideFn = SubscriptionOverride || SubscriptionCached;
+ return new OverrideFn(a, b);
+}
+
+Subscription.formatName_ = function() {
+ var args = [].slice.apply(arguments);
+ return (formatName_Override || formatName_Cached).apply(null, args);
+};
+
+var Topic = require('sandboxed-module')
+ .require('../../lib/pubsub/topic.js', {
+ requires: {
+ './subscription.js': Subscription
+ }
+ });
+
+describe('Topic', function() {
+ var PROJECT_ID = 'test-project';
+ var TOPIC_NAME = 'test-topic';
+ var pubsubMock = {
+ projectId: PROJECT_ID,
+ makeReq_: util.noop
+ };
+ var topic;
+
+ beforeEach(function() {
+ topic = new Topic(pubsubMock, { name: TOPIC_NAME });
+ });
+
+ afterEach(function() {
+ SubscriptionOverride = null;
+ formatName_Override = null;
+ });
+
+ describe('initialization', function() {
+ it('should format name', function(done) {
+ var formatName_ = Topic.formatName_;
+ Topic.formatName_ = function() {
+ Topic.formatName_ = formatName_;
+ done();
+ };
+ new Topic(pubsubMock, { name: TOPIC_NAME });
+ });
+
+ it('should assign projectId to `this`', function() {
+ assert.equal(topic.projectId, PROJECT_ID);
+ });
+
+ it('should assign pubsub object to `this`', function() {
+ assert.deepEqual(topic.pubsub, pubsubMock);
+ });
+ });
+
+ describe('formatName_', function() {
+ var fullName = '/topics/' + PROJECT_ID + '/' + TOPIC_NAME;
+
+ it('should format name', function() {
+ var formattedName = Topic.formatName_(PROJECT_ID, TOPIC_NAME);
+ assert.equal(formattedName, fullName);
+ });
+
+ it('should format name when given a complete name', function() {
+ var formattedName = Topic.formatName_(PROJECT_ID, fullName);
+ assert.equal(formattedName, fullName);
+ });
+ });
+
+ describe('publishing', function() {
+ var message = 'howdy';
+ var messageBuffer = new Buffer(message);
+ var messageRaw = { data: messageBuffer.toString('base64') };
+ var messageObj = { test: 'object' };
+ var messageObjDecoded =
+ new Buffer(JSON.stringify(messageObj)).toString('base64');
+
+ describe('publish', function() {
+ it('should throw if no message is provided', function() {
+ assert.throws(function() {
+ topic.publish();
+ }, /empty message/);
+ });
+
+ it('should convert string to raw message', function(done) {
+ topic.publishRaw = function(msg) {
+ assert.deepEqual(msg, messageRaw);
+ done();
+ };
+ topic.publish(message, assert.ifError);
+ });
+
+ it('should convert buffer to raw message', function(done) {
+ topic.publishRaw = function(msg) {
+ assert.deepEqual(msg, messageRaw);
+ done();
+ };
+ topic.publish(messageBuffer, assert.ifError);
+ });
+
+ it('should stringify non-strings & non-buffers', function(done) {
+ topic.publishRaw = function(msg) {
+ assert.deepEqual(msg.data, messageObjDecoded);
+ done();
+ };
+ topic.publish(messageObj, assert.ifError);
+ });
+
+ it('should pass callback', function(done) {
+ topic.publishRaw = function(msg, callback) {
+ callback();
+ };
+ topic.publish(message, done);
+ });
+ });
+
+ describe('publishRaw', function() {
+ it('should throw if no message is provided', function() {
+ assert.throws(function() {
+ topic.publishRaw();
+ }, /empty message/);
+ });
+
+ it('should stringify non-strings & non-buffers', function(done) {
+ topic.makeReq_ = function(method, path, qs, body) {
+ assert.deepEqual(body.data, messageObjDecoded);
+ done();
+ };
+ topic.publishRaw({ data: messageObj }, assert.ifError);
+ });
+
+ it('should post raw messages to the api', function(done) {
+ topic.makeReq_ = function(method, path, qs, body) {
+ assert.equal(method, 'POST');
+ assert.equal(path, 'topics/publish');
+ assert.deepEqual(body.message, messageRaw.message);
+ done();
+ };
+ topic.publishRaw(messageRaw, assert.ifError);
+ });
+
+ it('should attach topic name to the request', function(done) {
+ topic.makeReq_ = function(method, path, qs, body) {
+ assert.equal(body.topic, topic.name);
+ done();
+ };
+ topic.publishRaw(messageRaw, assert.ifError);
+ });
+ });
+ });
+
+ describe('delete', function() {
+ it('should delete a topic', function(done) {
+ topic.makeReq_ = function(method, path) {
+ assert.equal(method, 'DELETE');
+ assert.equal(path, 'topics/' + topic.name);
+ done();
+ };
+ topic.delete();
+ });
+ });
+
+ describe('subscriptions', function() {
+ var SUB_NAME = 'new-sub-name';
+ var SUB_FULL_NAME = '/subscriptions/' + PROJECT_ID + '/' + SUB_NAME;
+ var CONFIG = { autoAck: true, interval: 90 };
+
+ describe('getSubscriptions', function() {
+ it('should call parent getSubscriptions', function(done) {
+ topic.pubsub.getSubscriptions = function() {
+ done();
+ };
+ topic.getSubscriptions(assert.ifError);
+ });
+
+ it('should pass query', function(done) {
+ var query = { pageToken: 1, maxResults: 3 };
+ topic.pubsub.getSubscriptions = function(q) {
+ assert.strictEqual(q.pageToken, query.pageToken);
+ assert.strictEqual(q.maxResults, query.maxResults);
+ done();
+ };
+ topic.getSubscriptions(query, assert.ifError);
+ });
+
+ it('should pass callback', function(done) {
+ topic.pubsub.getSubscriptions = function(q, callback) {
+ callback();
+ };
+ topic.getSubscriptions({}, done);
+ });
+
+ it('should attach scoped topic query to query object', function(done) {
+ topic.pubsub.getSubscriptions = function(q) {
+ assert.equal(
+ q.query, 'pubsub.googleapis.com/topic in (' + topic.name + ')');
+ done();
+ };
+ topic.getSubscriptions({}, assert.ifError);
+ });
+ });
+
+ describe('subscribe', function() {
+ it('should throw if no name is provided', function() {
+ assert.throws(function() {
+ topic.subscribe();
+ }, /name.*required/);
+ });
+
+ it('should not require configuration options', function(done) {
+ topic.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ topic.subscribe(SUB_NAME, done);
+ });
+
+ it('should format provided name', function(done) {
+ formatName_Override = function() {
+ done();
+ };
+ topic.subscribe(SUB_NAME, assert.ifError);
+ });
+
+ it('should send correct request', function(done) {
+ topic.makeReq_ = function(method, path, qs, body) {
+ assert.equal(method, 'POST');
+ assert.equal(path, 'subscriptions');
+ assert.equal(body.topic, topic.name);
+ assert.equal(body.name, SUB_FULL_NAME);
+ done();
+ };
+ topic.subscribe(SUB_NAME, assert.ifError);
+ });
+
+ it('should return an api error to the callback', function(done) {
+ var error = new Error('Error.');
+ topic.makeReq_ = function(method, path, qs, body, callback) {
+ callback(error);
+ };
+ topic.subscribe(SUB_NAME, function(err) {
+ assert.equal(err, error);
+ done();
+ });
+ });
+
+ it('should create a new subscription', function(done) {
+ topic.subscription = function(name) {
+ assert.equal(name, SUB_NAME);
+ done();
+ };
+ topic.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ topic.subscribe(SUB_NAME, assert.ifError);
+ });
+
+ it('should honor settings on the api request', function(done) {
+ var SEC = 90;
+ topic.makeReq_ = function(method, path, qs, body) {
+ assert.strictEqual(body.ackDeadlineSeconds, SEC);
+ done();
+ };
+ topic.subscribe(SUB_NAME, { ackDeadlineSeconds: SEC }, assert.ifError);
+ });
+
+ it('should honor settings on the subscription object', function(done) {
+ topic.subscription = function(name, options) {
+ assert.deepEqual(options, CONFIG);
+ done();
+ };
+ topic.makeReq_ = function(method, path, qs, body, callback) {
+ callback();
+ };
+ topic.subscribe(SUB_NAME, CONFIG, assert.ifError);
+ });
+ });
+
+ describe('subscription', function() {
+ it('should throw if no name is provided', function() {
+ assert.throws(function() {
+ topic.subscription();
+ }, /name.*required/);
+ });
+
+ it('should return a Subscription object', function() {
+ SubscriptionOverride = function() {};
+ var subscription = topic.subscription(SUB_NAME, {});
+ assert(subscription instanceof SubscriptionOverride);
+ });
+
+ it('should honor settings', function(done) {
+ SubscriptionOverride = function(pubsub, options) {
+ assert.deepEqual(options, CONFIG);
+ done();
+ };
+ topic.subscription(SUB_NAME, CONFIG);
+ });
+
+ it('should pass specified name to the Subscription', function(done) {
+ SubscriptionOverride = function(pubsub, options) {
+ assert.equal(options.name, SUB_NAME);
+ done();
+ };
+ topic.subscription(SUB_NAME, {});
+ });
+
+ it('should not require options', function() {
+ assert.doesNotThrow(function() {
+ topic.subscription(SUB_NAME);
+ });
+ });
+ });
+ });
+});