Skip to content

Commit 48a9258

Browse files
authored
fix(bigquery/storage/managedwriter): retry improvements (#9642)
This PR makes two changes to retry behaviors in managedwriter. In the first, this PR expands the set of conditions that trigger reconnect when sending the initial request to the backend. In the second, this PR adds some additional handling for context cancellations when reading responses back from the service. In cases like reconnection, we establish a new Connection, each of which has it's own associated context. When draining remaining writes from a connection that's being shut down, we now pass the write into a retryer with a status-based error rather than raw context.Canceled, so we can recover more cleanly if the user is leveraging write retries. Related internal issue: b/326242484
1 parent a7abf56 commit 48a9258

File tree

3 files changed

+56
-14
lines changed

3 files changed

+56
-14
lines changed

bigquery/storage/managedwriter/connection.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -498,24 +498,39 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f
498498
// enables testing
499499
type streamClientFunc func(context.Context, ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error)
500500

501+
var errConnectionCanceled = grpcstatus.Error(codes.Canceled, "client connection context was canceled")
502+
501503
// connRecvProcessor is used to propagate append responses back up with the originating write requests. It
502504
// It runs as a goroutine. A connection object allows for reconnection, and each reconnection establishes a new
503-
// processing gorouting and backing channel.
505+
// context, processing goroutine and backing channel.
504506
func connRecvProcessor(ctx context.Context, co *connection, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
505507
for {
506508
select {
507509
case <-ctx.Done():
508-
// Context is done, so we're not going to get further updates. Mark all work left in the channel
509-
// with the context error. We don't attempt to re-enqueue in this case.
510+
// Channel context is done, which means we're not getting further updates on in flight appends and should
511+
// process everything left in the existing channel/connection.
512+
doneErr := ctx.Err()
513+
if doneErr == context.Canceled {
514+
// This is a special case. Connection recovery ends up cancelling a context as part of a reconnection, and with
515+
// request retrying enabled we can possibly re-enqueue writes. To allow graceful retry for this behavior, we
516+
// we translate this to an rpc status error to avoid doing things like introducing context errors as part of the retry predicate.
517+
//
518+
// The tradeoff here is that write retries may roundtrip multiple times for something like a pool shutdown, even though the final
519+
// outcome would result in an error.
520+
doneErr = errConnectionCanceled
521+
}
510522
for {
511523
pw, ok := <-ch
512524
if !ok {
513525
return
514526
}
515-
// It's unlikely this connection will recover here, but for correctness keep the flow controller
516-
// state correct by releasing.
527+
// This connection will not recover, but still attempt to keep flow controller state consistent.
517528
co.release(pw)
518-
pw.markDone(nil, ctx.Err())
529+
530+
// TODO: Determine if/how we should report this case, as we have no viable context for propagating.
531+
532+
// Because we can't tell locally if this write is done, we pass it back to the retrier for possible re-enqueue.
533+
pw.writer.processRetry(pw, co, nil, doneErr)
519534
}
520535
case nextWrite, ok := <-ch:
521536
if !ok {

bigquery/storage/managedwriter/retry.go

+16-6
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,23 @@ func (sr *statelessRetryer) Retry(err error, attemptCount int) (time.Duration, b
130130
// our bidi stream to close/reopen based on the responses error. Errors here signal that no
131131
// further appends will succeed.
132132
func shouldReconnect(err error) bool {
133-
var knownErrors = []error{
134-
io.EOF,
135-
status.Error(codes.Unavailable, "the connection is draining"), // errStreamDrain in gRPC transport
133+
134+
// io.EOF is the typical not connected signal.
135+
if errors.Is(err, io.EOF) {
136+
return true
137+
}
138+
// Backend responses that trigger reconnection on send.
139+
reconnectCodes := []codes.Code{
140+
codes.Aborted,
141+
codes.Canceled,
142+
codes.Unavailable,
143+
codes.DeadlineExceeded,
136144
}
137-
for _, ke := range knownErrors {
138-
if errors.Is(err, ke) {
139-
return true
145+
if s, ok := status.FromError(err); ok {
146+
for _, c := range reconnectCodes {
147+
if s.Code() == c {
148+
return true
149+
}
140150
}
141151
}
142152
return false

bigquery/storage/managedwriter/retry_test.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package managedwriter
1616

1717
import (
18+
"context"
1819
"fmt"
1920
"io"
2021
"testing"
@@ -60,6 +61,10 @@ func TestManagedStream_AppendErrorRetries(t *testing.T) {
6061
err: status.Error(codes.ResourceExhausted, "Exceeds 'AppendRows throughput' quota for some reason"),
6162
want: true,
6263
},
64+
{
65+
err: context.Canceled,
66+
want: false,
67+
},
6368
}
6469

6570
retry := newStatelessRetryer()
@@ -86,11 +91,23 @@ func TestManagedStream_ShouldReconnect(t *testing.T) {
8691
want: true,
8792
},
8893
{
89-
err: status.Error(codes.Unavailable, "nope"),
94+
err: status.Error(codes.Unavailable, "the connection is draining"),
95+
want: true,
96+
},
97+
{
98+
err: status.Error(codes.ResourceExhausted, "oof"), // may just be pushback
9099
want: false,
91100
},
92101
{
93-
err: status.Error(codes.Unavailable, "the connection is draining"),
102+
err: status.Error(codes.Canceled, "blah"),
103+
want: true,
104+
},
105+
{
106+
err: status.Error(codes.Aborted, "connection has been idle too long"),
107+
want: true,
108+
},
109+
{
110+
err: status.Error(codes.DeadlineExceeded, "blah"), // possibly bad backend, reconnect to speed recovery.
94111
want: true,
95112
},
96113
{

0 commit comments

Comments
 (0)