Skip to content

Commit a07bf1d

Browse files
authored
fix(bigquery/storage/managedwriter): fix flowcontrol refund on error (#9649)
Previously, connection's `lockingAppend` did not properly refund the connection's flow controller if the send response errored. This PR addresses that issue, and includes a test to ensure the correct behavior. Fixes: #9540
1 parent 1f7fb5c commit a07bf1d

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

bigquery/storage/managedwriter/connection.go

+2
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,8 @@ func (co *connection) lockingAppend(pw *pendingWrite) error {
419419
err = (*arc).Send(pw.constructFullRequest(true))
420420
}
421421
if err != nil {
422+
// Refund the flow controller immediately, as there's nothing to refund on the receiver.
423+
co.fc.release(pw.reqSize)
422424
if shouldReconnect(err) {
423425
metricCtx := co.ctx // start with the ctx that must be present
424426
if pw.writer != nil {

bigquery/storage/managedwriter/connection_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,58 @@ func TestConnection_OpenWithRetry(t *testing.T) {
103103
}
104104
}
105105

106+
// Ensure we properly refund the flow control during send failures.
107+
// https://github.com/googleapis/google-cloud-go/issues/9540
108+
func TestConnection_LockingAppendFlowRelease(t *testing.T) {
109+
ctx := context.Background()
110+
111+
pool := &connectionPool{
112+
ctx: ctx,
113+
baseFlowController: newFlowController(10, 0),
114+
open: openTestArc(&testAppendRowsClient{},
115+
func(req *storagepb.AppendRowsRequest) error {
116+
// Append always reports EOF on send.
117+
return io.EOF
118+
}, nil),
119+
}
120+
router := newSimpleRouter("")
121+
if err := pool.activateRouter(router); err != nil {
122+
t.Errorf("activateRouter: %v", err)
123+
}
124+
125+
writer := &ManagedStream{id: "foo", ctx: ctx}
126+
if err := pool.addWriter(writer); err != nil {
127+
t.Errorf("addWriter: %v", err)
128+
}
129+
130+
pw := newPendingWrite(ctx, writer, &storagepb.AppendRowsRequest{WriteStream: "somestream"}, newVersionedTemplate(), "", "")
131+
for i := 0; i < 5; i++ {
132+
conn, err := router.pool.selectConn(pw)
133+
if err != nil {
134+
t.Errorf("selectConn: %v", err)
135+
}
136+
137+
// Ensure FC is empty before lockingAppend
138+
if got := conn.fc.count(); got != 0 {
139+
t.Errorf("attempt %d expected empty flow count, got %d", i, got)
140+
}
141+
if got := conn.fc.bytes(); got != 0 {
142+
t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
143+
}
144+
// invoke lockingAppend, which fails
145+
if err := conn.lockingAppend(pw); err != io.EOF {
146+
t.Errorf("lockingAppend attempt %d: expected io.EOF, got %v", i, err)
147+
}
148+
// Ensure we're refunded due to failure
149+
if got := conn.fc.count(); got != 0 {
150+
t.Errorf("attempt %d expected empty flow count, got %d", i, got)
151+
}
152+
if got := conn.fc.bytes(); got != 0 {
153+
t.Errorf("attempt %d expected empty flow bytes, got %d", i, got)
154+
}
155+
}
156+
}
157+
106158
// Ensures we don't lose track of channels/connections during reconnects.
107159
// https://github.com/googleapis/google-cloud-go/issues/6766
108160
func TestConnection_LeakingReconnect(t *testing.T) {

0 commit comments

Comments
 (0)