Skip to content

Commit 5b7397b

Browse files
frankyntritone
andauthored
feat(storage): add grpc metrics experimental options (#10984)
* feat(storage): add experimental options * use internal for experimental option introduction * reduce to minimal changes * remove comments from unexported options * fix issues committed on merging main.. * address review feedback * revert changes to go.work.sum * vet.sh and update comments * address feedback * allow any non-zero interval * note monitoring interval --------- Co-authored-by: Chris Cotter <[email protected]>
1 parent 5e363a3 commit 5b7397b

File tree

6 files changed

+142
-37
lines changed

6 files changed

+142
-37
lines changed

storage/experimental/experimental.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,26 @@ import (
2525
"time"
2626

2727
"cloud.google.com/go/storage/internal"
28+
"go.opentelemetry.io/otel/sdk/metric"
2829
"google.golang.org/api/option"
2930
)
3031

31-
// WithReadStallTimeout provides a [ClientOption] that may be passed to [storage.NewClient].
32+
// WithMetricInterval provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
33+
// It sets how often to emit metrics [metric.WithInterval] when using
34+
// [metric.NewPeriodicReader]
35+
// When using Cloud Monitoring interval must be at minimum 1 [time.Minute].
36+
func WithMetricInterval(metricInterval time.Duration) option.ClientOption {
37+
return internal.WithMetricInterval.(func(time.Duration) option.ClientOption)(metricInterval)
38+
}
39+
40+
// WithMetricExporter provides a [option.ClientOption] that may be passed to [storage.NewGRPCClient].
41+
// Set an alternate client-side metric Exporter to emit metrics through.
42+
// Must implement [metric.Exporter]
43+
func WithMetricExporter(ex *metric.Exporter) option.ClientOption {
44+
return internal.WithMetricExporter.(func(*metric.Exporter) option.ClientOption)(ex)
45+
}
46+
47+
// WithReadStallTimeout provides a [option.ClientOption] that may be passed to [storage.NewClient].
3248
// It enables the client to retry stalled requests when starting a download from
3349
// Cloud Storage. If the timeout elapses with no response from the server, the request
3450
// is automatically retried.

storage/grpc_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ func newGRPCStorageClient(ctx context.Context, opts ...storageOption) (storageCl
129129

130130
if !config.disableClientMetrics {
131131
// Do not fail client creation if enabling metrics fails.
132-
if metricsContext, err := enableClientMetrics(ctx, s); err == nil {
132+
if metricsContext, err := enableClientMetrics(ctx, s, config); err == nil {
133133
s.metricsContext = metricsContext
134134
s.clientOption = append(s.clientOption, metricsContext.clientOpts...)
135135
} else {

storage/grpc_metrics.go

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,6 @@ func newPreparedResource(ctx context.Context, project string, resourceOptions []
134134
}
135135

136136
type metricsContext struct {
137-
// project used by exporter
138-
project string
139137
// client options passed to gRPC channels
140138
clientOpts []option.ClientOption
141139
// instance of metric reader used by gRPC client-side metrics
@@ -154,29 +152,36 @@ func createHistogramView(name string, boundaries []float64) metric.View {
154152
})
155153
}
156154

157-
func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext, error) {
158-
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
159-
if err != nil {
160-
return nil, err
161-
}
162-
// Implementation requires a project, if one is not determined possibly user
163-
// credentials. Then we will fail stating gRPC Metrics require a project-id.
164-
if project == "" && preparedResource.projectToUse != "" {
165-
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
166-
}
167-
// If projectTouse isn't the same as project provided to Storage client, then
168-
// emit a log stating which project is being used to emit metrics to.
169-
if project != preparedResource.projectToUse {
170-
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
171-
}
172-
meOpts := []mexporter.Option{
173-
mexporter.WithProjectID(preparedResource.projectToUse),
174-
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
175-
mexporter.WithCreateServiceTimeSeries(),
176-
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
177-
exporter, err := mexporter.New(meOpts...)
178-
if err != nil {
179-
return nil, err
155+
func newGRPCMetricContext(ctx context.Context, project string, config storageConfig) (*metricsContext, error) {
156+
var exporter metric.Exporter
157+
meterOpts := []metric.Option{}
158+
if config.metricExporter != nil {
159+
exporter = *config.metricExporter
160+
} else {
161+
preparedResource, err := newPreparedResource(ctx, project, []resource.Option{resource.WithDetectors(gcp.NewDetector())})
162+
if err != nil {
163+
return nil, err
164+
}
165+
meterOpts = append(meterOpts, metric.WithResource(preparedResource.resource))
166+
// Implementation requires a project, if one is not determined possibly user
167+
// credentials. Then we will fail stating gRPC Metrics require a project-id.
168+
if project == "" && preparedResource.projectToUse == "" {
169+
return nil, fmt.Errorf("google cloud project is required to start client-side metrics")
170+
}
171+
// If projectTouse isn't the same as project provided to Storage client, then
172+
// emit a log stating which project is being used to emit metrics to.
173+
if project != preparedResource.projectToUse {
174+
log.Printf("The Project ID configured for metrics is %s, but the Project ID of the storage client is %s. Make sure that the service account in use has the required metric writing role (roles/monitoring.metricWriter) in the project projectIdToUse or metrics will not be written.", preparedResource.projectToUse, project)
175+
}
176+
meOpts := []mexporter.Option{
177+
mexporter.WithProjectID(preparedResource.projectToUse),
178+
mexporter.WithMetricDescriptorTypeFormatter(metricFormatter),
179+
mexporter.WithCreateServiceTimeSeries(),
180+
mexporter.WithMonitoredResourceDescription(monitoredResourceName, []string{"project_id", "location", "cloud_platform", "host_id", "instance_id", "api"})}
181+
exporter, err = mexporter.New(meOpts...)
182+
if err != nil {
183+
return nil, err
184+
}
180185
}
181186
// Metric views update histogram boundaries to be relevant to GCS
182187
// otherwise default OTel histogram boundaries are used.
@@ -185,11 +190,13 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
185190
createHistogramView("grpc.client.attempt.rcvd_total_compressed_message_size", sizeHistogramBoundaries()),
186191
createHistogramView("grpc.client.attempt.sent_total_compressed_message_size", sizeHistogramBoundaries()),
187192
}
188-
provider := metric.NewMeterProvider(
189-
metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(time.Minute))),
190-
metric.WithResource(preparedResource.resource),
191-
metric.WithView(metricViews...),
192-
)
193+
interval := time.Minute
194+
if config.metricInterval > 0 {
195+
interval = config.metricInterval
196+
}
197+
meterOpts = append(meterOpts, metric.WithReader(metric.NewPeriodicReader(&exporterLogSuppressor{exporter: exporter}, metric.WithInterval(interval))),
198+
metric.WithView(metricViews...))
199+
provider := metric.NewMeterProvider(meterOpts...)
193200
mo := opentelemetry.MetricsOptions{
194201
MeterProvider: provider,
195202
Metrics: opentelemetry.DefaultMetrics().Add(
@@ -209,22 +216,21 @@ func newGRPCMetricContext(ctx context.Context, project string) (*metricsContext,
209216
option.WithGRPCDialOption(grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
210217
}
211218
context := &metricsContext{
212-
project: preparedResource.projectToUse,
213219
clientOpts: opts,
214220
provider: provider,
215221
close: createShutdown(ctx, provider),
216222
}
217223
return context, nil
218224
}
219225

220-
func enableClientMetrics(ctx context.Context, s *settings) (*metricsContext, error) {
226+
func enableClientMetrics(ctx context.Context, s *settings, config storageConfig) (*metricsContext, error) {
221227
var project string
222228
c, err := transport.Creds(ctx, s.clientOption...)
223229
if err == nil {
224230
project = c.ProjectID
225231
}
226232
// Enable client-side metrics for gRPC
227-
metricsContext, err := newGRPCMetricContext(ctx, project)
233+
metricsContext, err := newGRPCMetricContext(ctx, project, config)
228234
if err != nil {
229235
return nil, fmt.Errorf("gRPC Metrics: %w", err)
230236
}

storage/internal/experimental.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
package internal
1818

1919
var (
20+
// WithMetricInterval is a function which is implemented by storage package.
21+
// It sets how often to emit metrics when using NewPeriodicReader and must be
22+
// greater than 1 minute.
23+
WithMetricInterval any // func (*time.Duration) option.ClientOption
24+
25+
// WithMetricExporter is a function which is implemented by storage package.
26+
// Set an alternate client-side metric Exporter to emit metrics through.
27+
WithMetricExporter any // func (*metric.Exporter) option.ClientOption
28+
2029
// WithReadStallTimeout is a function which is implemented by storage package.
2130
// It takes ReadStallTimeoutConfig as inputs and returns a option.ClientOption.
2231
WithReadStallTimeout any // func (*ReadStallTimeoutConfig) option.ClientOption

storage/option.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"cloud.google.com/go/storage/experimental"
2323
storageinternal "cloud.google.com/go/storage/internal"
24+
"go.opentelemetry.io/otel/sdk/metric"
2425
"google.golang.org/api/option"
2526
"google.golang.org/api/option/internaloption"
2627
)
@@ -35,7 +36,9 @@ const (
3536
)
3637

3738
func init() {
38-
// initialize experimental option.
39+
// initialize experimental options
40+
storageinternal.WithMetricExporter = withMetricExporter
41+
storageinternal.WithMetricInterval = withMetricInterval
3942
storageinternal.WithReadStallTimeout = withReadStallTimeout
4043
}
4144

@@ -69,12 +72,13 @@ func getDynamicReadReqInitialTimeoutSecFromEnv(defaultVal time.Duration) time.Du
6972
return val
7073
}
7174

72-
// storageConfig contains the Storage client option configuration that can be
7375
// set through storageClientOptions.
7476
type storageConfig struct {
7577
useJSONforReads bool
7678
readAPIWasSet bool
7779
disableClientMetrics bool
80+
metricExporter *metric.Exporter
81+
metricInterval time.Duration
7882
readStallTimeoutConfig *experimental.ReadStallTimeoutConfig
7983
}
8084

@@ -160,6 +164,34 @@ func (w *withDisabledClientMetrics) ApplyStorageOpt(c *storageConfig) {
160164
c.disableClientMetrics = w.disabledClientMetrics
161165
}
162166

167+
type withMeterOptions struct {
168+
internaloption.EmbeddableAdapter
169+
// set sampling interval
170+
interval time.Duration
171+
}
172+
173+
func withMetricInterval(interval time.Duration) option.ClientOption {
174+
return &withMeterOptions{interval: interval}
175+
}
176+
177+
func (w *withMeterOptions) ApplyStorageOpt(c *storageConfig) {
178+
c.metricInterval = w.interval
179+
}
180+
181+
type withMetricExporterConfig struct {
182+
internaloption.EmbeddableAdapter
183+
// exporter override
184+
metricExporter *metric.Exporter
185+
}
186+
187+
func withMetricExporter(ex *metric.Exporter) option.ClientOption {
188+
return &withMetricExporterConfig{metricExporter: ex}
189+
}
190+
191+
func (w *withMetricExporterConfig) ApplyStorageOpt(c *storageConfig) {
192+
c.metricExporter = w.metricExporter
193+
}
194+
163195
// WithReadStallTimeout is an option that may be passed to [NewClient].
164196
// It enables the client to retry the stalled read request, happens as part of
165197
// storage.Reader creation. As the name suggest, timeout is adjusted dynamically

storage/option_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"time"
2121

2222
"cloud.google.com/go/storage/experimental"
23+
mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
2324
"github.com/google/go-cmp/cmp"
2425
"google.golang.org/api/option"
2526
)
@@ -37,6 +38,8 @@ func TestApplyStorageOpt(t *testing.T) {
3738
useJSONforReads: true,
3839
readAPIWasSet: true,
3940
disableClientMetrics: false,
41+
metricInterval: 0,
42+
metricExporter: nil,
4043
},
4144
},
4245
{
@@ -46,6 +49,8 @@ func TestApplyStorageOpt(t *testing.T) {
4649
useJSONforReads: false,
4750
readAPIWasSet: true,
4851
disableClientMetrics: false,
52+
metricInterval: 0,
53+
metricExporter: nil,
4954
},
5055
},
5156
{
@@ -55,6 +60,8 @@ func TestApplyStorageOpt(t *testing.T) {
5560
useJSONforReads: false,
5661
readAPIWasSet: true,
5762
disableClientMetrics: false,
63+
metricInterval: 0,
64+
metricExporter: nil,
5865
},
5966
},
6067
{
@@ -64,6 +71,8 @@ func TestApplyStorageOpt(t *testing.T) {
6471
useJSONforReads: false,
6572
readAPIWasSet: false,
6673
disableClientMetrics: false,
74+
metricInterval: 0,
75+
metricExporter: nil,
6776
},
6877
},
6978
{
@@ -73,6 +82,8 @@ func TestApplyStorageOpt(t *testing.T) {
7382
useJSONforReads: false,
7483
readAPIWasSet: false,
7584
disableClientMetrics: false,
85+
metricInterval: 0,
86+
metricExporter: nil,
7687
},
7788
},
7889
{
@@ -82,6 +93,19 @@ func TestApplyStorageOpt(t *testing.T) {
8293
useJSONforReads: false,
8394
readAPIWasSet: false,
8495
disableClientMetrics: true,
96+
metricInterval: 0,
97+
metricExporter: nil,
98+
},
99+
},
100+
{
101+
desc: "set metrics interval",
102+
opts: []option.ClientOption{experimental.WithMetricInterval(time.Minute * 5)},
103+
want: storageConfig{
104+
useJSONforReads: false,
105+
readAPIWasSet: false,
106+
disableClientMetrics: false,
107+
metricInterval: time.Minute * 5,
108+
metricExporter: nil,
85109
},
86110
},
87111
{
@@ -128,6 +152,24 @@ func TestApplyStorageOpt(t *testing.T) {
128152
}
129153
}
130154

155+
func TestSetCustomExporter(t *testing.T) {
156+
exporter, err := mexporter.New()
157+
if err != nil {
158+
t.Errorf("TestSetCustomExporter: %v", err)
159+
}
160+
want := storageConfig{
161+
metricExporter: &exporter,
162+
}
163+
var got storageConfig
164+
opt := experimental.WithMetricExporter(&exporter)
165+
if storageOpt, ok := opt.(storageClientOption); ok {
166+
storageOpt.ApplyStorageOpt(&got)
167+
}
168+
if got.metricExporter != want.metricExporter {
169+
t.Errorf("TestSetCustomExpoerter: metricExporter want=%v, got=%v", want.metricExporter, got.metricExporter)
170+
}
171+
}
172+
131173
func TestGetDynamicReadReqInitialTimeoutSecFromEnv(t *testing.T) {
132174
defaultValue := 10 * time.Second
133175

0 commit comments

Comments
 (0)