Skip to content

Commit b29912f

Browse files
authored
feat(bigquery/storage/managedwriter): graceful connection drains (#11463)
* feat(bigquery/storage/managedwriter): graceful connection drains This PR enables a more graceful shutdown when we're doing client initiated reconnects, typically in response to things like schema changes. With this PR, rather than immediately cancelling the previous connection context, we instead use a goroutine to defer this by a fixed interval, which gives our recv goroutine more time to process any incoming notifications from the service. This can yield a net reduction in duplicate rows for some scenarios, particularly when EnableWriteRetries is enabled and we see sequential disconnects (e.g. a multiplex connection observing a number of schema updates). It does, however, yield possible short term increases in resource usage, as we're now potentially retaining goroutines/channels/connections for a longer interval during these reconnect events. This PR updates testing that looks for leaking resources with this new understanding accordingly.
1 parent d7ee725 commit b29912f

File tree

2 files changed

+25
-3
lines changed

2 files changed

+25
-3
lines changed

bigquery/storage/managedwriter/connection.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"io"
2222
"sync"
23+
"time"
2324

2425
"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
2526
"github.com/googleapis/gax-go/v2"
@@ -36,6 +37,8 @@ const (
3637

3738
var (
3839
errNoRouterForPool = errors.New("no router for connection pool")
40+
// TODO(https://github.com/googleapis/google-cloud-go/issues/11460): revisit if this should be user configurable
41+
gracefulReconnectDuration = 20 * time.Second
3942
)
4043

4144
// connectionPool represents a pooled set of connections.
@@ -489,7 +492,12 @@ func (co *connection) getStream(arc *storagepb.BigQueryWrite_AppendRowsClient, f
489492
close(co.pending)
490493
}
491494
if co.cancel != nil {
492-
co.cancel()
495+
// Delay cancellation to give queued writes a chance to drain normally.
496+
oldCancel := co.cancel
497+
go func() {
498+
time.Sleep(gracefulReconnectDuration)
499+
oldCancel()
500+
}()
493501
co.ctx, co.cancel = context.WithCancel(co.pool.ctx)
494502
}
495503

bigquery/storage/managedwriter/managed_stream_test.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -519,11 +519,13 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
519519
[]byte("foo"),
520520
}
521521

522+
appendCount := 15
523+
// Give a small budget for additional goroutines to account for jitter.
522524
threshold := runtime.NumGoroutine() + 5
523525

524526
// Send a bunch of appends that will trigger reconnects and monitor that
525527
// goroutine growth stays within bounded threshold.
526-
for i := 0; i < 30; i++ {
528+
for i := 0; i < appendCount; i++ {
527529
writeCtx := context.Background()
528530
r, err := ms.AppendRows(writeCtx, fakeData)
529531
if err != nil {
@@ -539,12 +541,24 @@ func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
539541
if testArc.openCount != i+2 {
540542
t.Errorf("should trigger a reconnect, but found openCount %d", testArc.openCount)
541543
}
542-
if i%10 == 0 {
544+
// Bump our threshold each append. We add 2 goroutines each reconnect due to a new processor
545+
// and a graceful cancellation, but this should recover over time.
546+
threshold = threshold + 2
547+
548+
if i%5 == 0 {
543549
if current := runtime.NumGoroutine(); current > threshold {
544550
t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold)
545551
}
546552
}
547553
}
554+
// Verify goroutine count drops after graceful shutdowns should have concluded.
555+
time.Sleep(gracefulReconnectDuration)
556+
threshold = threshold - (2 * appendCount)
557+
558+
if current := runtime.NumGoroutine(); current > threshold {
559+
t.Errorf("potential goroutine leak after recovery: current %d, threshold %d", current, threshold)
560+
}
561+
548562
}
549563

550564
func TestManagedWriter_CancellationDuringRetry(t *testing.T) {

0 commit comments

Comments
 (0)