Skip to content

deadlock in testStreamAckDeadlineUpdate #1577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
pongad opened this issue Jan 30, 2017 · 3 comments
Closed

deadlock in testStreamAckDeadlineUpdate #1577

pongad opened this issue Jan 30, 2017 · 3 comments
Assignees

Comments

@pongad
Copy link
Contributor

pongad commented Jan 30, 2017

@garrettjonesgoogle @davidtorres @kir-titievsky

I tracked down another deadlock, but this one I cannot fix without changing the surface.

In the happy case, the test

  • receives a message, and creates a future to respond
  • advance time by 20s
  • sets the future result to ACK
    • this updates the latency distribution
    • the next time we configure the stream latency, we'll set the latency to 20s
  • advance time by 60s
    • we configure stream latency every minute
    • so, this tells the server to change our stream latency to 20s
  • YAY!

The mechanism we use the update latency is that we add a callback to the ListenableFuture that is returned by MessageReceiver::receiveMessage. The logic is that once the user finishes with the work, the latency distribution is automatically updated.

In the bad case:

  • the GRPC thread calls MessageReciever::receive
    • the test implementation creates a future, puts it in a queue to be retrieved by the test code.
  • advance time by 20s
  • the test thread takes the future above out the queue, and sets future result
  • advance time by 60s
  • then, the GRPC thread finally adds the callback (let's call this step S1)
    • since the future is already completed, we update the latency distribution to 80s
  • the minute for configuring stream latency has also passed (let's call this step S2)
    • change stream latency to 80s

In an even worse case, S1 and S2 can switch order. Since there is no entries in the distribution, we don't update the server with anything. The latency on the server stays at 10s (the default).

Test code expects the latency value on the server eventually be set to 20s. In either bad case, the test deadlocks.

I don't think this test can be fixed with the current surface. Since the Future is returned by MessageReceiver, it stands to reason that other threads can do arbitrary things to the future before the callback can be added. Note that this problem affects production code as well: the synchronization is still technically incorrect though the consequence is small enough that it probably will never matter.

I can think of a couple of ways to fix it:
Current

// Definition
ListenableFuture<AckReply> receiveMessage(PubsubMessage message);

// Sample implementation
ListenableFuture<AckReply> receiveMessage(final PubsubMessage message) {
  return executor.submit(new Callable<AckReply>() {
    @Override
    AckReply call() {
      if (someLongComputation(message)) {
        return AckReply.ACK;
      }
      return AckReply.NACK;
    }
  });
}

Option 1

// Definition
void receiveMessage(PubsubMessage message, SettableFuture<AckReply> response);

// Sample implementation
void receiveMessage(final PubsubMessage message, final SettableFuture<AckReply> response) {
  ListenableFuture<AckReply> future = executor.submit(new Callable<AckReply>() {
    @Override
    AckReply call() {
      if (someLongComputation(message)) {
        return AckReply.ACK;
      }
      return AckReply.NACK;
    }
  });
  response.setFuture(future);
}

In this way, we create the future for the user and can make sure the callback is set before the user can do anything with it. If the user takes a long time to respond to a job, job scheduling is still left to the user. Eg, if the user wants to process many messages in a threadpool, the user must create the threadpool themselves.

Option 2

// Definition
AckReply receiveMessage(PubsubMessage message);
// This is essentially the same with Function<PubsubMessage, AckReply>

// Sample implementation
AckReply receiveMessage(PubsubMessage message) {
  return someLongComputation(message);
}

We can call the function from a threadpool ourselves. The function must be thread safe. We can increase the number of threads in the pool to cope with extra load. The user can always set the number of threads to suit their work load.

@pongad pongad self-assigned this Jan 30, 2017
@garrettjonesgoogle
Copy link
Member

@pongad I think it would help evaluation of these options if you showed what the user code needs to be to implement them.

@davidtorres
Copy link

I agree with your assessment, unfortunately even with forcing to wait that the expected messages have been received (by using TestReceiver.waitForExpectedMessages) that does not guarantee that the callback has been attached, hence the race, thanks for catching this. And as you say this race is much less important in production since in a next cycle (60s) the correct latency will still be reported. I wish we could control the gRPC threads, I couldn't find a way myself to swap their executor though.

I so much prefer option 1 because option 2 forces a synchronous execution.

Though the sample code could be simplified as follows (no need to synthesize another future):

void receiveMessage(final PubsubMessage message, final SettableFuture response) {
executor.submit(new Runnable() {
@OverRide
void run() {
if (someLongComputation(message)) {
response.set(AckReply.ACK);
}
response.set(AckReply.NACK);
}
});
}

pongad added a commit that referenced this issue Feb 2, 2017
* fix deadlock in testStreamAckDeadlineUpdate

Fixes #1577. The detailed description of the bug is available in the
issue.

This commit fixes the bug by ensuring that the Future received by
MessageReceiver cannot be completed before a callback is added to it.
@pongad
Copy link
Contributor Author

pongad commented Feb 2, 2017

Closed by #1581

@pongad pongad closed this as completed Feb 2, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants