Skip to content

Commit f87a55c

Browse files
Merge pull request #988 from roberttod/pubsub-encoding-option
Add pubsub encoding option
2 parents 1ac48c3 + 485f02d commit f87a55c

File tree

2 files changed

+62
-4
lines changed

2 files changed

+62
-4
lines changed

lib/pubsub/subscription.js

+7-3
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ function Subscription(pubsub, options) {
273273
is.number(options.maxInProgress) ? options.maxInProgress : Infinity;
274274
this.messageListeners = 0;
275275
this.paused = false;
276+
this.encoding = is.string(options.encoding) ? options.encoding : 'utf-8';
276277

277278
this.listenForEvents_();
278279
}
@@ -285,7 +286,7 @@ modelo.inherits(Subscription, ServiceObject, events.EventEmitter);
285286
*
286287
* @private
287288
*/
288-
Subscription.formatMessage_ = function(msg) {
289+
Subscription.formatMessage_ = function(msg, encoding) {
289290
var innerMessage = msg.message;
290291
var message = {
291292
ackId: msg.ackId
@@ -295,7 +296,7 @@ Subscription.formatMessage_ = function(msg) {
295296
message.id = innerMessage.messageId;
296297

297298
if (innerMessage.data) {
298-
message.data = new Buffer(innerMessage.data, 'base64').toString('utf-8');
299+
message.data = new Buffer(innerMessage.data, 'base64').toString(encoding);
299300

300301
try {
301302
message.data = JSON.parse(message.data);
@@ -482,6 +483,7 @@ Subscription.prototype.delete = function(callback) {
482483
Subscription.prototype.pull = function(options, callback) {
483484
var self = this;
484485
var MAX_EVENTS_LIMIT = 1000;
486+
var encoding = this.encoding;
485487

486488
if (!callback) {
487489
callback = options;
@@ -509,7 +511,9 @@ Subscription.prototype.pull = function(options, callback) {
509511
}
510512

511513
var messages = arrify(response.receivedMessages)
512-
.map(Subscription.formatMessage_)
514+
.map(function(msg) {
515+
return Subscription.formatMessage_(msg, encoding);
516+
})
513517
.map(self.decorateMessage_.bind(self));
514518

515519
self.refreshPausedStatus_();

test/pubsub/subscription.js

+55-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ describe('Subscription', function() {
4747
};
4848
var message = 'howdy';
4949
var messageBuffer = new Buffer(message).toString('base64');
50+
var messageBinary = new Buffer(message).toString('binary');
5051
var messageObj = {
5152
receivedMessages: [{
5253
ackId: 3,
@@ -61,6 +62,11 @@ describe('Subscription', function() {
6162
data: message,
6263
id: 7
6364
};
65+
var expectedMessageAsBinary = {
66+
ackId: 3,
67+
data: messageBinary,
68+
id: 7
69+
};
6470

6571
before(function() {
6672
mockery.registerMock('../common/service-object.js', FakeServiceObject);
@@ -98,12 +104,14 @@ describe('Subscription', function() {
98104
name: SUB_NAME,
99105
autoAck: true,
100106
interval: 100,
101-
maxInProgress: 3
107+
maxInProgress: 3,
108+
encoding: 'binary'
102109
};
103110
var sub = new Subscription(PUBSUB, CONFIG);
104111
assert.strictEqual(sub.autoAck, CONFIG.autoAck);
105112
assert.strictEqual(sub.interval, CONFIG.interval);
106113
assert.strictEqual(sub.maxInProgress, 3);
114+
assert.strictEqual(sub.encoding, CONFIG.encoding);
107115
});
108116

109117
it('should be closed', function() {
@@ -134,6 +142,10 @@ describe('Subscription', function() {
134142
assert.strictEqual(subscription.paused, false);
135143
});
136144

145+
it('should default encoding to utf-8 if not specified', function() {
146+
assert.strictEqual(subscription.encoding, 'utf-8');
147+
});
148+
137149
it('should create an iam object', function() {
138150
assert.deepEqual(subscription.iam.calledWith_, [
139151
PUBSUB,
@@ -219,6 +231,12 @@ describe('Subscription', function() {
219231
var msg = Subscription.formatMessage_(messageObj.receivedMessages[0]);
220232
assert.deepEqual(msg, expectedMessage);
221233
});
234+
235+
it('should decode buffer to specified encoding', function() {
236+
var msg = Subscription
237+
.formatMessage_(messageObj.receivedMessages[0], 'binary');
238+
assert.deepEqual(msg, expectedMessageAsBinary);
239+
});
222240
});
223241

224242
describe('formatName_', function() {
@@ -473,6 +491,42 @@ describe('Subscription', function() {
473491
subscription.pull({}, assert.ifError);
474492
});
475493

494+
describe('with encoding option', function() {
495+
var formatMessageMemo, encodingMemo, onFormatMessage;
496+
beforeEach(function() {
497+
encodingMemo = subscription.encoding;
498+
formatMessageMemo = Subscription.formatMessage_;
499+
Subscription.formatMessage_ = function(msg, encoding) {
500+
onFormatMessage(msg, encoding);
501+
return formatMessageMemo(msg, encoding);
502+
};
503+
});
504+
afterEach(function() {
505+
Subscription.formatMessage_ = formatMessageMemo;
506+
subscription.encoding = encodingMemo;
507+
});
508+
509+
it('should call formatMessage_ with binary', function(done) {
510+
onFormatMessage = function(msg, encoding) {
511+
assert.equal(encoding, 'binary');
512+
done();
513+
};
514+
subscription.encoding = 'binary';
515+
516+
subscription.pull({}, assert.ifError);
517+
});
518+
519+
it('should call formatMessage_ with utf-8', function(done) {
520+
onFormatMessage = function(msg, encoding) {
521+
assert.equal(encoding, 'utf-8');
522+
done();
523+
};
524+
subscription.encoding = 'utf-8';
525+
526+
subscription.pull({}, assert.ifError);
527+
});
528+
});
529+
476530
describe('autoAck false', function() {
477531
beforeEach(function() {
478532
subscription.autoAck = false;

0 commit comments

Comments
 (0)