Skip to content

pubsub: api redesign #2380

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 67 commits into from
Aug 24, 2017
Merged

pubsub: api redesign #2380

merged 67 commits into from
Aug 24, 2017

Conversation

callmehiphop
Copy link
Contributor

@callmehiphop callmehiphop commented Jun 12, 2017

docs demo

TODO:

  • Unit Tests
  • StreamingPull System Tests
  • Docs
  • README

@callmehiphop callmehiphop added api: pubsub Issues related to the Pub/Sub API. status: do not merge labels Jun 12, 2017
@googlebot googlebot added the cla: yes This human has signed the Contributor License Agreement. label Jun 12, 2017
Copy link
Contributor

@lukesneeringer lukesneeringer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good overall; a couple concerns with what is there so far.

*
*/
Message.prototype.nack = function() {

This comment was marked as spam.

}
});

this.settings = extend(true, {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

*/
Publisher.prototype.publish = function(data, attrs, callback) {
if (!(data instanceof Buffer)) {
throw new Error('Data must be in the form of a Buffer.');

This comment was marked as spam.

})
};

this.inventory.inFlight = messages;

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -0,0 +1,220 @@
/*!
* Copyright 2014 Google Inc. All Rights Reserved.

This comment was marked as spam.

var prop = require('propprop');

/**
* Semi-generic Batch object used to queue up multiple requests while limiting

This comment was marked as spam.

This comment was marked as spam.

* request.
* @param {number} options.maxMilliseconds - Max time to wait before sending
* data.
* @param {number} options.maxRequests - Max number of concurrent requests

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* var messages = batch.getNextMessageBatch();
* batch.send(messages);
*/
Batch.prototype.getNextMessageBatch = function() {

This comment was marked as spam.

callback: callback
});

var reachedMaxMessages = this.inventory.queued.size >= this.maxMessages;

This comment was marked as spam.

this.intervalHandle_ = setInterval(function() {
self.send(self.getNextMessageBatch());

if (self.inventory.queued.size === 0) {

This comment was marked as spam.

var message;

while (message = queued.next().value) {
var isOverSizeLimit = (size + message.size) > this.maxBytes;

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* @example
* batch.beginSending();
*/
Batch.prototype.beginSending = function() {

This comment was marked as spam.

Batch.prototype.send = function(messages) {
if (this.activeRequests_ >= this.maxRequests) {
this.scheduleSend();
return;

This comment was marked as spam.

ackDeadlineSeconds: seconds
};

this.subscription.request({

This comment was marked as spam.

This comment was marked as spam.

@@ -0,0 +1,98 @@
/*!
* Copyright 2014 Google Inc. All Rights Reserved.

This comment was marked as spam.

This comment was marked as spam.

@@ -0,0 +1,110 @@
/*!
* Copyright 2014 Google Inc. All Rights Reserved.

This comment was marked as spam.

this.api = topic.api;

var batchOptions = extend(options.batching, {
send: this.publish_.bind(this)

This comment was marked as spam.

This comment was marked as spam.

// times out around 90 seconds. Allow an extra couple of seconds to give the
// API a chance to respond on its own before terminating the connection.
this.timeout = PUBSUB_API_TIMEOUT + 2000;
if (options.topic) {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

commonGrpc.Service.call(this, config, options);
this.api = {};
this.auth = googleAuth(this.options);
this.projectId = this.options.projectId || '{{projectId}}';

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

* used when converting a message's data to a string. (default: 'utf-8')
* @param {number|date} options.messageRetentionDuration - Set this to override
* the default duration of 7 days. This value is expected in seconds.
* Acceptable values are in the range of 10 minutes and 7 days.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

/**
* This should never be called directly.
*/
Publisher.prototype.publish_ = function(messages, done) {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

gaxOpts: gaxOpts
}, function(err, resp) {
if (!err) {
self.closeConnection_();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

if (err) {
callback(err, resp);
self.emit('error', err);
return;

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

self.emit('error', err);
});

self.connection.on('data', function(data) {

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@@ -678,63 +560,40 @@ Subscription.prototype.delete = function(callback) {
*/
Subscription.prototype.pull = function(options, callback) {

This comment was marked as spam.


callback(null, resp);
self.connection.on('error', function(err) {
var retryCodes = [2, 4, 14];

This comment was marked as spam.


setTimeout(self.startPulling_.bind(self), self.interval);
});
return this.pubsub.snapshot.call(this, name);
};

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

@jganetsk
Copy link

fyi, gRPC apparently has a default receiver limit on message size of 4 MB, but we need to set the limit to 20 MB + 1 byte.

@coveralls
Copy link

Coverage Status

Coverage remained the same at 100.0% when pulling f758d07 on callmehiphop:dg--pubsub-redesign into f243e26 on GoogleCloudPlatform:master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the Pub/Sub API. cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants