-
Notifications
You must be signed in to change notification settings - Fork 616
pubsub: api redesign #2380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: api redesign #2380
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good overall; a couple concerns with what is there so far.
packages/pubsub/src/message.js
Outdated
* | ||
*/ | ||
Message.prototype.nack = function() { | ||
|
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
} | ||
}); | ||
|
||
this.settings = extend(true, { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
*/ | ||
Publisher.prototype.publish = function(data, attrs, callback) { | ||
if (!(data instanceof Buffer)) { | ||
throw new Error('Data must be in the form of a Buffer.'); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
}) | ||
}; | ||
|
||
this.inventory.inFlight = messages; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
@@ -0,0 +1,220 @@ | |||
/*! | |||
* Copyright 2014 Google Inc. All Rights Reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
var prop = require('propprop'); | ||
|
||
/** | ||
* Semi-generic Batch object used to queue up multiple requests while limiting |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
* request. | ||
* @param {number} options.maxMilliseconds - Max time to wait before sending | ||
* data. | ||
* @param {number} options.maxRequests - Max number of concurrent requests |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
* var messages = batch.getNextMessageBatch(); | ||
* batch.send(messages); | ||
*/ | ||
Batch.prototype.getNextMessageBatch = function() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
callback: callback | ||
}); | ||
|
||
var reachedMaxMessages = this.inventory.queued.size >= this.maxMessages; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
this.intervalHandle_ = setInterval(function() { | ||
self.send(self.getNextMessageBatch()); | ||
|
||
if (self.inventory.queued.size === 0) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
var message; | ||
|
||
while (message = queued.next().value) { | ||
var isOverSizeLimit = (size + message.size) > this.maxBytes; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
* @example | ||
* batch.beginSending(); | ||
*/ | ||
Batch.prototype.beginSending = function() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/batch.js
Outdated
Batch.prototype.send = function(messages) { | ||
if (this.activeRequests_ >= this.maxRequests) { | ||
this.scheduleSend(); | ||
return; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/message.js
Outdated
ackDeadlineSeconds: seconds | ||
}; | ||
|
||
this.subscription.request({ |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/message.js
Outdated
@@ -0,0 +1,98 @@ | |||
/*! | |||
* Copyright 2014 Google Inc. All Rights Reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
@@ -0,0 +1,110 @@ | |||
/*! | |||
* Copyright 2014 Google Inc. All Rights Reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
this.api = topic.api; | ||
|
||
var batchOptions = extend(options.batching, { | ||
send: this.publish_.bind(this) |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
// times out around 90 seconds. Allow an extra couple of seconds to give the | ||
// API a chance to respond on its own before terminating the connection. | ||
this.timeout = PUBSUB_API_TIMEOUT + 2000; | ||
if (options.topic) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
commonGrpc.Service.call(this, config, options); | ||
this.api = {}; | ||
this.auth = googleAuth(this.options); | ||
this.projectId = this.options.projectId || '{{projectId}}'; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/index.js
Outdated
* used when converting a message's data to a string. (default: 'utf-8') | ||
* @param {number|date} options.messageRetentionDuration - Set this to override | ||
* the default duration of 7 days. This value is expected in seconds. | ||
* Acceptable values are in the range of 10 minutes and 7 days. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/publisher.js
Outdated
/** | ||
* This should never be called directly. | ||
*/ | ||
Publisher.prototype.publish_ = function(messages, done) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
gaxOpts: gaxOpts | ||
}, function(err, resp) { | ||
if (!err) { | ||
self.closeConnection_(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
if (err) { | ||
callback(err, resp); | ||
self.emit('error', err); | ||
return; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
self.emit('error', err); | ||
}); | ||
|
||
self.connection.on('data', function(data) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
@@ -678,63 +560,40 @@ Subscription.prototype.delete = function(callback) { | |||
*/ | |||
Subscription.prototype.pull = function(options, callback) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
|
||
callback(null, resp); | ||
self.connection.on('error', function(err) { | ||
var retryCodes = [2, 4, 14]; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
packages/pubsub/src/subscription.js
Outdated
|
||
setTimeout(self.startPulling_.bind(self), self.interval); | ||
}); | ||
return this.pubsub.snapshot.call(this, name); | ||
}; | ||
|
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
fyi, gRPC apparently has a default receiver limit on message size of 4 MB, but we need to set the limit to 20 MB + 1 byte. |
docs demo
TODO: