Skip to content

Commit fa31ec0

Browse files
authored
fix(bigquery/storage/managedwriter): resolve data races (#9360)
This PR adds a non-assertive test which helps expose data races by doing a lot of concurrent write operations on a single ManagedStream instance. As a byproduct, this cleans up two possible races: In the first, a deferred function may incorrectly access a retained context. We change this to grab a reference to the context in the defer where we still retain the lock. In the second, the retry mechanism leverages math/rand and retry processing can yield concurrent usage of the random number source. PR adds a mutex guard to the source. Fixes: https://togithub.com/googleapis/google-cloud-go/issues/9301
1 parent b7bf165 commit fa31ec0

File tree

3 files changed

+96
-8
lines changed

3 files changed

+96
-8
lines changed

bigquery/storage/managedwriter/connection.go

+8-7
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
353353
return err
354354
}
355355

356-
var statsOnExit func()
356+
var statsOnExit func(ctx context.Context)
357357

358358
// critical section: Things that need to happen inside the critical section:
359359
//
@@ -362,9 +362,10 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
362362
// * add the pending write to the channel for the connection (ordering for the response)
363363
co.mu.Lock()
364364
defer func() {
365+
sCtx := co.ctx
365366
co.mu.Unlock()
366-
if statsOnExit != nil {
367-
statsOnExit()
367+
if statsOnExit != nil && sCtx != nil {
368+
statsOnExit(sCtx)
368369
}
369370
}()
370371

@@ -441,12 +442,12 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
441442
numRows = int64(len(pr.GetSerializedRows()))
442443
}
443444
}
444-
statsOnExit = func() {
445+
statsOnExit = func(ctx context.Context) {
445446
// these will get recorded once we exit the critical section.
446447
// TODO: resolve open questions around what labels should be attached (connection, streamID, etc)
447-
recordStat(co.ctx, AppendRequestRows, numRows)
448-
recordStat(co.ctx, AppendRequests, 1)
449-
recordStat(co.ctx, AppendRequestBytes, int64(pw.reqSize))
448+
recordStat(ctx, AppendRequestRows, numRows)
449+
recordStat(ctx, AppendRequests, 1)
450+
recordStat(ctx, AppendRequestBytes, int64(pw.reqSize))
450451
}
451452
ch <- pw
452453
return nil

bigquery/storage/managedwriter/managed_stream_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"errors"
2020
"io"
2121
"runtime"
22+
"sync"
2223
"testing"
2324
"time"
2425

@@ -643,3 +644,84 @@ func TestManagedStream_Closure(t *testing.T) {
643644
t.Errorf("expected writer ctx to be dead, is alive")
644645
}
645646
}
647+
648+
// This test exists to try to surface data races by sharing
649+
// a single writer with multiple goroutines. It doesn't assert
650+
// anything about the behavior of the system.
651+
func TestManagedStream_RaceFinder(t *testing.T) {
652+
ctx, cancel := context.WithCancel(context.Background())
653+
654+
var totalsMu sync.Mutex
655+
totalSends := 0
656+
totalRecvs := 0
657+
pool := &connectionPool{
658+
ctx: ctx,
659+
cancel: cancel,
660+
baseFlowController: newFlowController(0, 0),
661+
open: openTestArc(&testAppendRowsClient{},
662+
func(req *storagepb.AppendRowsRequest) error {
663+
totalsMu.Lock()
664+
totalSends = totalSends + 1
665+
curSends := totalSends
666+
totalsMu.Unlock()
667+
if curSends%25 == 0 {
668+
//time.Sleep(10 * time.Millisecond)
669+
return io.EOF
670+
}
671+
return nil
672+
},
673+
func() (*storagepb.AppendRowsResponse, error) {
674+
totalsMu.Lock()
675+
totalRecvs = totalRecvs + 1
676+
curRecvs := totalRecvs
677+
totalsMu.Unlock()
678+
if curRecvs%15 == 0 {
679+
return nil, io.EOF
680+
}
681+
return &storagepb.AppendRowsResponse{}, nil
682+
}),
683+
}
684+
router := newSimpleRouter("")
685+
if err := pool.activateRouter(router); err != nil {
686+
t.Errorf("activateRouter: %v", err)
687+
}
688+
689+
ms := &ManagedStream{
690+
id: "foo",
691+
streamSettings: defaultStreamSettings(),
692+
retry: newStatelessRetryer(),
693+
}
694+
ms.retry.maxAttempts = 4
695+
ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
696+
ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
697+
if err := pool.addWriter(ms); err != nil {
698+
t.Errorf("addWriter A: %v", err)
699+
}
700+
701+
if router.conn == nil {
702+
t.Errorf("expected non-nil connection")
703+
}
704+
705+
numWriters := 5
706+
numWrites := 50
707+
708+
var wg sync.WaitGroup
709+
wg.Add(numWriters)
710+
for i := 0; i < numWriters; i++ {
711+
go func() {
712+
for j := 0; j < numWrites; j++ {
713+
result, err := ms.AppendRows(ctx, [][]byte{[]byte("foo")})
714+
if err != nil {
715+
continue
716+
}
717+
_, err = result.GetResult(ctx)
718+
if err != nil {
719+
continue
720+
}
721+
}
722+
wg.Done()
723+
}()
724+
}
725+
wg.Wait()
726+
cancel()
727+
}

bigquery/storage/managedwriter/retry.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"io"
2020
"math/rand"
2121
"strings"
22+
"sync"
2223
"time"
2324

2425
"github.com/googleapis/gax-go/v2"
@@ -82,7 +83,9 @@ func (ur *unaryRetryer) Retry(err error) (time.Duration, bool) {
8283
// from the receive side of the bidi stream. An individual item in that process has a notion of an attempt
8384
// count, and we use maximum retries as a way of evicting bad items.
8485
type statelessRetryer struct {
85-
r *rand.Rand
86+
mu sync.Mutex // guards r
87+
r *rand.Rand
88+
8689
minBackoff time.Duration
8790
jitter time.Duration
8891
aggressiveFactor int
@@ -101,7 +104,9 @@ func newStatelessRetryer() *statelessRetryer {
101104
func (sr *statelessRetryer) pause(aggressiveBackoff bool) time.Duration {
102105
jitter := sr.jitter.Nanoseconds()
103106
if jitter > 0 {
107+
sr.mu.Lock()
104108
jitter = sr.r.Int63n(jitter)
109+
sr.mu.Unlock()
105110
}
106111
pause := sr.minBackoff.Nanoseconds() + jitter
107112
if aggressiveBackoff {

0 commit comments

Comments
 (0)