Skip to content

Commit 9018b83

Browse files
authored
refactor: simplify options, instrumentation for multiplexing (#7257)
This PR consolidates where in managedwriter that options apply by moving the destinationTable inside of streamSettings. This PR also make changes to how instrumentation works. Currently, we retain a long-lived context on the the ManagedStream and that's used for both connection management and to attach tag context to instrumentation reporting (e.g. to tag metrics with the appropriate keys). With this change, connection-oriented metrics no longer tag with the writer tags (stream ID, data origin). We also remove the stream's keys from the default views the package exposes for these metrics. When we cut over to connections and pools, the context retained on the ManagedStream will only used for metrics tagging, and thus will come from the parent context setup elsewhere (namely the pool context). Towards: https://togithub.com/googleapis/google-cloud-go/issues/7103
1 parent edf3c24 commit 9018b83

File tree

6 files changed

+87
-43
lines changed

6 files changed

+87
-43
lines changed

bigquery/storage/managedwriter/client.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
131131

132132
if ms.streamSettings.streamID == "" {
133133
// not instantiated with a stream, construct one.
134-
streamName := fmt.Sprintf("%s/streams/_default", ms.destinationTable)
134+
streamName := fmt.Sprintf("%s/streams/_default", ms.streamSettings.destinationTable)
135135
if ms.streamSettings.streamType != DefaultStream {
136136
// For everything but a default stream, we create a new stream on behalf of the user.
137137
req := &storagepb.CreateWriteStreamRequest{
138-
Parent: ms.destinationTable,
138+
Parent: ms.streamSettings.destinationTable,
139139
WriteStream: &storagepb.WriteStream{
140140
Type: streamTypeToEnum(ms.streamSettings.streamType),
141141
}}
@@ -149,13 +149,11 @@ func (c *Client) buildManagedStream(ctx context.Context, streamFunc streamClient
149149
}
150150
}
151151
if ms.streamSettings != nil {
152-
if ms.ctx != nil {
153-
ms.ctx = keyContextWithTags(ms.ctx, ms.streamSettings.streamID, ms.streamSettings.dataOrigin)
154-
}
155152
ms.fc = newFlowController(ms.streamSettings.MaxInflightRequests, ms.streamSettings.MaxInflightBytes)
156153
} else {
157154
ms.fc = newFlowController(0, 0)
158155
}
156+
ms.ctx = setupWriterStatContext(ms)
159157
return ms, nil
160158
}
161159

@@ -173,9 +171,9 @@ func (c *Client) validateOptions(ctx context.Context, ms *ManagedStream) error {
173171
}
174172
// update type and destination based on stream metadata
175173
ms.streamSettings.streamType = StreamType(info.Type.String())
176-
ms.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
174+
ms.streamSettings.destinationTable = TableParentFromStreamName(ms.streamSettings.streamID)
177175
}
178-
if ms.destinationTable == "" {
176+
if ms.streamSettings.destinationTable == "" {
179177
return fmt.Errorf("no destination table specified")
180178
}
181179
// we could auto-select DEFAULT here, but let's force users to be specific for now.

bigquery/storage/managedwriter/instrumentation.go

+31-20
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@ package managedwriter
1616

1717
import (
1818
"context"
19-
"log"
20-
"sync"
2119

2220
"go.opencensus.io/stats"
2321
"go.opencensus.io/stats/view"
@@ -129,8 +127,8 @@ var (
129127
)
130128

131129
func init() {
132-
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount), keyStream, keyDataOrigin)
133-
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount), keyStream, keyDataOrigin)
130+
AppendClientOpenView = createSumView(stats.Measure(AppendClientOpenCount))
131+
AppendClientOpenRetryView = createSumView(stats.Measure(AppendClientOpenRetryCount))
134132

135133
AppendRequestsView = createSumView(stats.Measure(AppendRequests), keyStream, keyDataOrigin)
136134
AppendRequestBytesView = createSumView(stats.Measure(AppendRequestBytes), keyStream, keyDataOrigin)
@@ -173,24 +171,37 @@ func createSumView(m stats.Measure, keys ...tag.Key) *view.View {
173171
return createView(m, view.Sum(), keys...)
174172
}
175173

176-
var logTagStreamOnce sync.Once
177-
var logTagOriginOnce sync.Once
178-
179-
// keyContextWithStreamID returns a new context modified with the instrumentation tags.
180-
func keyContextWithTags(ctx context.Context, streamID, dataOrigin string) context.Context {
181-
ctx, err := tag.New(ctx, tag.Upsert(keyStream, streamID))
182-
if err != nil {
183-
logTagStreamOnce.Do(func() {
184-
log.Printf("managedwriter: error creating tag map for 'streamID' key: %v", err)
185-
})
174+
// setupWriterStatContext returns a new context modified with the instrumentation tags.
175+
// This will panic if no managedstream is provided
176+
func setupWriterStatContext(ms *ManagedStream) context.Context {
177+
if ms == nil {
178+
panic("no ManagedStream provided")
179+
}
180+
kCtx := ms.ctx
181+
if ms.streamSettings == nil {
182+
return kCtx
183+
}
184+
if ms.streamSettings.streamID != "" {
185+
ctx, err := tag.New(kCtx, tag.Upsert(keyStream, ms.streamSettings.streamID))
186+
if err != nil {
187+
return kCtx // failed to add a tag, return the original context.
188+
}
189+
kCtx = ctx
186190
}
187-
ctx, err = tag.New(ctx, tag.Upsert(keyDataOrigin, dataOrigin))
188-
if err != nil {
189-
logTagOriginOnce.Do(func() {
190-
log.Printf("managedwriter: error creating tag map for 'dataOrigin' key: %v", err)
191-
})
191+
if ms.streamSettings.dataOrigin != "" {
192+
ctx, err := tag.New(kCtx, tag.Upsert(keyDataOrigin, ms.streamSettings.dataOrigin))
193+
if err != nil {
194+
return kCtx
195+
}
196+
kCtx = ctx
192197
}
193-
return ctx
198+
return kCtx
199+
}
200+
201+
// recordWriterStat records a measure which may optionally contain writer-related tags like stream ID
202+
// or data origin.
203+
func recordWriterStat(ms *ManagedStream, m *stats.Int64Measure, n int64) {
204+
stats.Record(ms.ctx, m.M(n))
194205
}
195206

196207
func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) {

bigquery/storage/managedwriter/integration_test.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,16 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
871871
// to report.
872872
time.Sleep(time.Second)
873873

874+
// metric to key tag names
875+
wantTags := map[string][]string{
876+
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_count": nil,
877+
"cloud.google.com/go/bigquery/storage/managedwriter/stream_open_retry_count": nil,
878+
"cloud.google.com/go/bigquery/storage/managedwriter/append_requests": []string{"streamID"},
879+
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_bytes": []string{"streamID"},
880+
"cloud.google.com/go/bigquery/storage/managedwriter/append_request_errors": []string{"streamID"},
881+
"cloud.google.com/go/bigquery/storage/managedwriter/append_rows": []string{"streamID"},
882+
}
883+
874884
for _, tv := range testedViews {
875885
// Attempt to further improve race failures by retrying metrics retrieval.
876886
metricData, err := func() ([]*view.Row, error) {
@@ -894,8 +904,25 @@ func testInstrumentation(ctx context.Context, t *testing.T, mwClient *Client, bq
894904
t.Errorf("%q: expected 1 row of metrics, got %d", tv.Name, mlen)
895905
continue
896906
}
897-
if len(metricData[0].Tags) != 1 {
898-
t.Errorf("%q: only expected 1 tag, got %d", tv.Name, len(metricData[0].Tags))
907+
if wantKeys, ok := wantTags[tv.Name]; ok {
908+
if wantKeys == nil {
909+
if n := len(tv.TagKeys); n != 0 {
910+
t.Errorf("expected view %q to have no keys, but %d present", tv.Name, n)
911+
}
912+
} else {
913+
for _, wk := range wantKeys {
914+
var found bool
915+
for _, gk := range tv.TagKeys {
916+
if gk.Name() == wk {
917+
found = true
918+
break
919+
}
920+
}
921+
if !found {
922+
t.Errorf("expected view %q to have key %q, but wasn't present", tv.Name, wk)
923+
}
924+
}
925+
}
899926
}
900927
entry := metricData[0].Data
901928
sum, ok := entry.(*view.SumData)

bigquery/storage/managedwriter/managed_stream.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ type ManagedStream struct {
8080

8181
streamSettings *streamSettings
8282
schemaDescriptor *descriptorpb.DescriptorProto
83-
destinationTable string
8483
c *Client
8584
fc *flowController
8685
retry *statelessRetryer
@@ -127,6 +126,9 @@ type streamSettings struct {
127126
// dataOrigin can be set for classifying metrics generated
128127
// by a stream.
129128
dataOrigin string
129+
130+
// retains reference to the target table when resolving settings
131+
destinationTable string
130132
}
131133

132134
func defaultStreamSettings() *streamSettings {
@@ -166,7 +168,7 @@ func (ms *ManagedStream) FlushRows(ctx context.Context, offset int64, opts ...ga
166168
},
167169
}
168170
resp, err := ms.c.rawClient.FlushRows(ctx, req, opts...)
169-
recordStat(ms.ctx, FlushRequests, 1)
171+
recordWriterStat(ms, FlushRequests, 1)
170172
if err != nil {
171173
return 0, err
172174
}
@@ -338,9 +340,9 @@ func (ms *ManagedStream) lockingAppend(pw *pendingWrite) error {
338340
numRows := int64(len(pw.request.GetProtoRows().Rows.GetSerializedRows()))
339341
statsOnExit = func() {
340342
// these will get recorded once we exit the critical section.
341-
recordStat(ms.ctx, AppendRequestRows, numRows)
342-
recordStat(ms.ctx, AppendRequests, 1)
343-
recordStat(ms.ctx, AppendRequestBytes, int64(pw.reqSize))
343+
recordWriterStat(ms, AppendRequestRows, numRows)
344+
recordWriterStat(ms, AppendRequests, 1)
345+
recordWriterStat(ms, AppendRequestBytes, int64(pw.reqSize))
344346
}
345347
ch <- pw
346348
return nil
@@ -362,8 +364,11 @@ func (ms *ManagedStream) appendWithRetry(pw *pendingWrite, opts ...gax.CallOptio
362364
// Append yielded an error. Retry by continuing or return.
363365
status := grpcstatus.Convert(appendErr)
364366
if status != nil {
365-
ctx, _ := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String()))
366-
recordStat(ctx, AppendRequestErrors, 1)
367+
recordCtx := ms.ctx
368+
if ctx, err := tag.New(ms.ctx, tag.Insert(keyError, status.Code().String())); err == nil {
369+
recordCtx = ctx
370+
}
371+
recordStat(recordCtx, AppendRequestErrors, 1)
367372
}
368373
bo, shouldRetry := ms.statelessRetryer().Retry(appendErr, pw.attemptCount)
369374
if shouldRetry {
@@ -471,6 +476,7 @@ func (ms *ManagedStream) AppendRows(ctx context.Context, data [][]byte, opts ...
471476
//
472477
// The ManagedStream reference is used for performing re-enqueing of failed writes.
473478
func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClient, ch <-chan *pendingWrite) {
479+
474480
for {
475481
select {
476482
case <-ms.ctx.Done():
@@ -498,14 +504,16 @@ func recvProcessor(ms *ManagedStream, arc storagepb.BigQueryWrite_AppendRowsClie
498504
continue
499505
}
500506
// Record that we did in fact get a response from the backend.
501-
recordStat(ms.ctx, AppendResponses, 1)
507+
recordWriterStat(ms, AppendResponses, 1)
502508

503509
if status := resp.GetError(); status != nil {
504510
// The response from the backend embedded a status error. We record that the error
505511
// occurred, and tag it based on the response code of the status.
506-
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr == nil {
507-
recordStat(tagCtx, AppendResponseErrors, 1)
512+
recordCtx := ms.ctx
513+
if tagCtx, tagErr := tag.New(ms.ctx, tag.Insert(keyError, codes.Code(status.GetCode()).String())); tagErr != nil {
514+
recordCtx = tagCtx
508515
}
516+
recordStat(recordCtx, AppendResponseErrors, 1)
509517
respErr := grpcstatus.ErrorProto(status)
510518
if _, shouldRetry := ms.statelessRetryer().Retry(respErr, nextWrite.attemptCount); shouldRetry {
511519
// We use the status error to evaluate and possible re-enqueue the write.
@@ -540,7 +548,7 @@ func (ms *ManagedStream) processRetry(pw *pendingWrite, appendResp *storagepb.Ap
540548
}
541549
// Break out of the loop, we were successful and the write has been
542550
// re-inserted.
543-
recordStat(ms.ctx, AppendRetryCount, 1)
551+
recordWriterStat(ms, AppendRetryCount, 1)
544552
break
545553
}
546554
}

bigquery/storage/managedwriter/options.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func WithStreamName(name string) WriterOption {
4848
// projects/{projectid}/datasets/{dataset}/tables/{table}
4949
func WithDestinationTable(destTable string) WriterOption {
5050
return func(ms *ManagedStream) {
51-
ms.destinationTable = destTable
51+
ms.streamSettings.destinationTable = destTable
5252
}
5353
}
5454

bigquery/storage/managedwriter/options_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ func TestWriterOptions(t *testing.T) {
9393
options: []WriterOption{WithDestinationTable("foo")},
9494
want: func() *ManagedStream {
9595
ms := &ManagedStream{
96-
streamSettings: defaultStreamSettings(),
97-
destinationTable: "foo",
96+
streamSettings: defaultStreamSettings(),
9897
}
98+
ms.streamSettings.destinationTable = "foo"
9999
return ms
100100
}(),
101101
},

0 commit comments

Comments
 (0)