-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Pubsub hp #1483
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pubsub hp #1483
Conversation
We found a Contributor License Agreement for you (the sender of this pull request), but were unable to find agreements for the commit author(s). If you authored these, maybe you used a different email address in the git commits than was used to sign the CLA (login here to double check)? If these were authored by someone else, then they will need to sign a CLA as well, and confirm that they're okay with these being contributed to Google. |
@pongad please take a look, this adds support for falling back to pull and ack when no streaming pull support and fixes the flakiness on the tests |
Changes Unknown when pulling 68d79ad on davidtorres:pubsub-hp into ** on GoogleCloudPlatform:pubsub-hp**. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Travis and AppVeyor seems to still be flaking but we are moving in the right direction. Thank you for working on this! I have a few comments, mostly stylistic.
@@ -142,6 +99,9 @@ | |||
void extendExpiration() { | |||
expiration = new Instant(clock.millis()).plus(Duration.standardSeconds(nextExtensionSeconds)); | |||
nextExtensionSeconds = 2 * nextExtensionSeconds; | |||
if (nextExtensionSeconds > MAX_ACK_DEADLINE_EXTENSION_SECS) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
Instant now = new Instant(clock.millis()); | ||
int totalByteCount = 0; | ||
final List<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size()); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
List<List<String>> ackChunks = Lists.partition(acksToSend, MAX_PER_REQUEST_CHANGES); | ||
Iterator<List<String>> ackChunksIt = ackChunks.iterator(); | ||
while (ackChunksIt.hasNext()) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
clock); | ||
} | ||
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numChannels); | ||
pollingSubscriberConnections = new ArrayList<PollingSubscriberConnection>(numChannels); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
ImmutableList<String> receivedAcksCopy = ImmutableList.copyOf(acks); | ||
acks.clear(); | ||
List<String> receivedAcksCopy = ImmutableList.copyOf(acks.subList(0, expectedCount)); | ||
acks.removeAll(receivedAcksCopy); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
modAckDeadlines.clear(); | ||
List<ModifyAckDeadline> modAckDeadlinesCopy = | ||
ImmutableList.copyOf(modAckDeadlines.subList(0, expectedCount)); | ||
modAckDeadlines.removeAll(modAckDeadlinesCopy); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@davidtorres Could you change the commit email? The one you're using, while undoubtedly yours, seems to be running into cla troubles. |
Also cc @garrettjonesgoogle |
68d79ad
to
0d82897
Compare
CLAs look good, thanks! |
@pongad comments addressed please take a look |
@pongad Not sure about the flakyness, can you point me to where you see this? |
Changes Unknown when pulling 0d82897 on davidtorres:pubsub-hp into ** on GoogleCloudPlatform:pubsub-hp**. |
final StreamObserver<StreamingPullResponse> responseObserver) { | ||
final Stream stream = new Stream(); | ||
stream.requestObserver = | ||
new StreamObserver<StreamingPullRequest>() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@Override | ||
void initialize() { | ||
final SettableFuture<Void> errorFuture = SettableFuture.create(); | ||
final ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> responseObserver = |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* Abstract base implementation class of a subscriber connection in charge of receiving subscription | ||
* messages. | ||
*/ | ||
abstract class AbstractSubscriberConnection extends AbstractService { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@davidtorres I'll approve the change from my end, please also address Garrett's comments |
@davidtorres The flake I thought I saw seems to have disappeared 😄 |
…ss to use containment in favor of inheritance
Changes Unknown when pulling 6ef9df7 on davidtorres:pubsub-hp into ** on GoogleCloudPlatform:pubsub-hp**. |
} | ||
|
||
@Override | ||
protected void doStop() { | ||
protected void stop() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* Abstract base implementation class of a subscriber connection in charge of receiving subscription | ||
* messages. | ||
*/ | ||
abstract class AbstractSubscriberConnection extends AbstractService { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
LGTM, I don't want to block on more refactoring right now. |
Changes Unknown when pulling f8c8dc6 on davidtorres:pubsub-hp into ** on GoogleCloudPlatform:pubsub-hp**. |
No description provided.