Skip to content

Commit 58d06e7

Browse files
author
Burcu Dogan
committed
Merge pull request #35 from rakyll/eventemitter-subs
WIP: EventEmitter Subscription
2 parents 61db807 + 2461ac4 commit 58d06e7

File tree

6 files changed

+468
-49
lines changed

6 files changed

+468
-49
lines changed

lib/pubsub/index.js

Lines changed: 154 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
* limitations under the License.
1515
*/
1616

17+
var events = require('events'),
18+
nodeutil = require('util');
19+
1720
var conn = require('../common/connection.js'),
1821
util = require('../common/util.js');
1922

@@ -31,15 +34,18 @@ var SCOPES = [
3134
'https://www.googleapis.com/auth/cloud-platform'
3235
];
3336

34-
// TODO(jbd): Emit message and error events if in polled-mode.
35-
// sub.on('meessage', console.log)
36-
// sub.on('error')
37-
3837
function Subscription(conn, name) {
38+
var that = this;
3939
this.conn = conn;
4040
this.name = name;
41+
42+
this.autoAck = false;
43+
this.pullIntervalInMs = 10;
44+
this.closed = false;
4145
}
4246

47+
nodeutil.inherits(Subscription, events.EventEmitter);
48+
4349
/**
4450
* Acknowledges the backend that message is retrieved.
4551
* @param {Array<String>} ids A list of message IDs.
@@ -56,44 +62,104 @@ Subscription.prototype.ack = function(ids, callback) {
5662

5763
/**
5864
* Pulls from the subscribed topic.
59-
* @param {Boolean} opts.returnImmediately If set, the system will respond immediately.
60-
* Otherwise, wait until new messages are
61-
* available. Returns if timeout is reached.
62-
* @param {Boolean} opts.autoAck Automatically acknowledges the
63-
* message once it's pulled.
64-
* @param {Function} callback Callback function.
65+
* @param {Boolean} opts.returnImmediately If set, the system will respond immediately.
66+
* Otherwise, wait until new messages are
67+
* available. Returns if timeout is reached.
68+
* @param {Function} callback Callback.
6569
*/
6670
Subscription.prototype.pull = function(opts, callback) {
67-
// TODO(jbd): Make opts optional.
6871
var that = this;
69-
var autoAck = !!opts.autoAck;
72+
// TODO(jbd): Should not be racing with other pull.
73+
// TOOD(jbd): Make opts optional.
7074
var body = {
7175
subscription: this.name,
7276
returnImmediately: !!opts.returnImmediately
7377
};
7478
this.conn.makeReq('POST', 'subscriptions/pull', null, body, function(err, message) {
75-
if (err) { return callback(err); }
76-
if (!autoAck) {
77-
return callback(null, message);
79+
// TODO(jbd): Fix API to return a list of messages.
80+
if (err) {
81+
callback(err);
82+
return;
83+
}
84+
if (!that.autoAck) {
85+
that.emitMessage_(message);
86+
callback();
87+
return;
7888
}
7989
that.ack(message.ackId, function(err) {
80-
if (err) { return callback(err); }
81-
callback(null, message);
90+
if (err) {
91+
callback(err);
92+
return;
93+
}
94+
that.emitMessage_(message);
95+
callback();
8296
});
8397
});
8498
};
8599

86100
/**
87-
* Unsubscribes the current subscription. Pull requests from the current
101+
* Polls the backend for new messages.
102+
*/
103+
Subscription.prototype.startPulling_ = function() {
104+
var that = this;
105+
var pullFn = function() {
106+
if (that.closed) {
107+
return;
108+
}
109+
that.pull({ returnImmediately: false }, function(err) {
110+
// TODO(jbd): Fix API to return a more explicit error code or message.
111+
if (err && err.message.indexOf('has no more messages') < 0) {
112+
that.emitError_(err);
113+
}
114+
setTimeout(function() {
115+
pullFn();
116+
}, that.pullIntervalInMs);
117+
});
118+
};
119+
pullFn();
120+
}
121+
122+
/**
123+
* Deletes the current subscription. Pull requests from the current
88124
* subscription will be errored once unsubscription is done.
89125
* @param {Function} opt_callback Optional callback.
90126
*/
91-
Subscription.prototype.unsubscribe = function(opt_callback) {
127+
Subscription.prototype.del = function(opt_callback) {
128+
var that = this;
92129
cb = opt_callback || util.noop;
93130
var path = util.format('subscriptions/{fullName}', {
94131
fullName: this.name
95132
});
96-
this.conn.makeReq('DELETE', path, null, true, cb);
133+
this.conn.makeReq('DELETE', path, null, true, function(err) {
134+
if (err) return cb(err);
135+
that.closed = true;
136+
cb(err);
137+
});
138+
};
139+
140+
/**
141+
* Closes the subscription.
142+
*/
143+
Subscription.prototype.close = function() {
144+
this.closed = true;
145+
};
146+
147+
/**
148+
* Emits a 'message' event with the provided message.
149+
*/
150+
Subscription.prototype.emitMessage_ = function(msg) {
151+
if (msg.pubsubEvent && msg.pubsubEvent.message) {
152+
var data = msg.pubsubEvent.message.data;
153+
msg.pubsubEvent.message.data = new Buffer(data, 'base64').toString('utf-8');
154+
}
155+
this.emit('message', msg);
156+
};
157+
158+
/**
159+
* Emits an error with the provided error.
160+
*/
161+
Subscription.prototype.emitError_ = function(err) {
162+
this.emit('error', err);
97163
};
98164

99165
/**
@@ -112,7 +178,12 @@ function Topic(conn, name) {
112178
* @param {Function} opt_callback Optional callback.
113179
*/
114180
Topic.prototype.publish = function(data, opt_callback) {
115-
this.publishMessage({ topic: this.name, data: data }, opt_callback);
181+
this.publishMessage({
182+
topic: this.name,
183+
message: {
184+
data: new Buffer(data).toString('base64')
185+
}
186+
}, opt_callback);
116187
};
117188

118189
/**
@@ -132,7 +203,7 @@ Topic.prototype.publishMessage = function(message, opt_callback) {
132203
*/
133204
Topic.prototype.del = function(opt_callback) {
134205
var path = 'topics/' + this.name;
135-
this.conn.makeReq('DELETE', path, null, true, cb);
206+
this.conn.makeReq('DELETE', path, null, true, opt_callback);
136207
};
137208

138209
/**
@@ -192,25 +263,62 @@ Connection.prototype.listSubscriptions = function(query, callback) {
192263
};
193264

194265
/**
195-
* Subscribe with the provided options.
196-
* @param {string} opts.name Name of the subscription.
197-
* @param {string} opts.topicName Name of the topic to subscribe.
198-
* @param {Function} opt_callback Optional callback.
266+
* Gets a subscription.
267+
* @param {string} name Name of the subscription.
268+
* @param {Function} callback Callback.
199269
*/
200-
Connection.prototype.subscribe = function(opts, opt_callback) {
270+
Connection.prototype.getSubscription = function(name, callback) {
201271
var that = this;
202-
var cb = opt_callback || util.noop;
203-
var body = {
204-
topic:'/topics/' + this.id + '/' + opts.topicName,
272+
var fullName = '/subscriptions/' + this.id + '/' + name;
273+
this.makeReq('GET', 'subscriptions/' + fullName, null, true, function(err) {
274+
if (err) {
275+
callback(err);
276+
return;
277+
}
278+
callback(null, new Subscription(that, fullName));
279+
});
280+
};
281+
282+
Connection.prototype.createSubscription = function(opts, callback) {
283+
var that = this;
284+
var subscription = {
285+
topic:'/topics/' + this.id + '/' + opts.topic,
205286
name: '/subscriptions/' + this.id + '/' + opts.name,
206287
ackDeadlineSeconds: opts.ackDeadlineSeconds
207288
};
208-
this.makeReq('POST', 'subscriptions', null, body, function(err, item) {
209-
// TODO(jbd): maybe init a subscription instance if http 200 or 409.
210-
cb(err, new Subscription(that, body.name));
289+
this.makeReq('POST', 'subscriptions', null, subscription, function(err) {
290+
if (err) {
291+
callback(err);
292+
return;
293+
}
294+
callback(null, new Subscription(that, subscription.name));
211295
});
212296
};
213297

298+
/**
299+
* Subscribe with the provided options.
300+
* @param {string} name Name of the subscription.
301+
* @param {Boolean} opts.autoAck Automatically acknowledges the
302+
* message once it's pulled.
303+
* @return {Subscription}
304+
*/
305+
Connection.prototype.subscribe = function(name, opts) {
306+
opts = opts || {};
307+
308+
var fullName = '/subscriptions/' + this.id + '/' + name;
309+
var sub = new Subscription(this, fullName);
310+
sub.autoAck = !!opts.autoAck;
311+
this.getSubscription(name, function(err) {
312+
if (err) {
313+
sub.emitError_(err);
314+
return;
315+
}
316+
sub.emit('ready');
317+
sub.startPulling_();
318+
});
319+
return sub;
320+
};
321+
214322
/**
215323
* Lists topics.
216324
* @param {string} query.pageToken Page token.
@@ -306,16 +414,26 @@ Connection.prototype.makeReq = function(method, path, q, body, callback) {
306414
}
307415
this.conn.req(reqOpts, function(err, res, body) {
308416
if (body && body.error) {
309-
return callback(body.error);
417+
callback(new util.ApiError(body.error)); return;
310418
}
311419
if (res && (res.statusCode < 200 || res.statusCode > 299)) {
312-
return callback(new Error('error during request, statusCode: ' + res.statusCode));
420+
callback(new Error('error during request, statusCode: ' + res.statusCode)); return;
313421
}
314-
callback(err, body);
422+
callback(null, body);
315423
});
316424
};
317425

318426
/**
319427
* Exports Connection.
320428
*/
321429
module.exports.Connection = Connection;
430+
431+
/**
432+
* Exports Topic.
433+
*/
434+
module.exports.Topic = Topic;
435+
436+
/**
437+
* Exports Subscription.
438+
*/
439+
module.exports.Subscription = Subscription;

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
},
3838
"scripts": {
3939
"test": "node_modules/mocha/bin/_mocha --reporter spec",
40-
"regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 4000 regression/*",
41-
"cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha"
40+
"regression-test": "node_modules/mocha/bin/_mocha --reporter spec --timeout 10000 regression/*",
41+
"cover": "node_modules/istanbul/lib/cli.js cover node_modules/mocha/bin/_mocha -- --timeout 10000 test/* regression/*"
4242
},
4343
"license": "Apache 2"
4444
}

regression/datastore.js

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,10 @@
1414
* limitations under the License.
1515
*/
1616

17-
if (!process.env.GCLOUD_TESTS_PROJECT_ID &&
18-
!process.env.GCLOUD_TESTS_SERVICE_ACCOUNT &&
19-
!process.env.GCLOUD_TESTS_PEM_KEY) {
20-
var error = ['To run the regression tests, you need to set the value of some environment variables.',
21-
'Please check the README for instructions.'
22-
].join('\n');
23-
throw error;
24-
}
25-
var projectId = process.env.GCLOUD_TESTS_PROJECT_ID,
26-
email = process.env.GCLOUD_TESTS_SERVICE_ACCOUNT,
27-
pemFilePath = process.env.GCLOUD_TESTS_PEM_KEY;
17+
var env = require('./env.js'),
18+
projectId = env.projectId,
19+
email = env.serviceAccount,
20+
pemFilePath = env.pemKey;
2821

2922
var assert = require('assert'),
3023
datastore = require('../lib/datastore'),

regression/env.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2014 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
if (!process.env.GCLOUD_TESTS_PROJECT_ID &&
18+
!process.env.GCLOUD_TESTS_SERVICE_ACCOUNT &&
19+
!process.env.GCLOUD_TESTS_PEM_KEY) {
20+
var error = ['To run the regression tests, you need to set the value of some environment variables.',
21+
'Please check the README for instructions.'
22+
].join('\n');
23+
throw error;
24+
}
25+
26+
module.exports = {
27+
projectId: process.env.GCLOUD_TESTS_PROJECT_ID,
28+
serviceAccount: process.env.GCLOUD_TESTS_SERVICE_ACCOUNT,
29+
pemKey: process.env.GCLOUD_TESTS_PEM_KEY
30+
};

0 commit comments

Comments
 (0)