Skip to content

Commit 9f66e50

Browse files
committed
add WithShutdownWait option to PeriodicReader
1 parent fdf7bec commit 9f66e50

File tree

3 files changed

+128
-17
lines changed

3 files changed

+128
-17
lines changed

sdk/metric/env.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const (
1717
envInterval = "OTEL_METRIC_EXPORT_INTERVAL"
1818
// Maximum allowed time (in milliseconds) to export data.
1919
envTimeout = "OTEL_METRIC_EXPORT_TIMEOUT"
20+
// The time (in milliseconds) to wait export remaining metrics in shutdown.
21+
envShutdownWait = "OTEL_METRIC_EXPORT_SHUTDOWN_WAIT"
2022
)
2123

2224
// envDuration returns an environment variable's value as duration in milliseconds if it is exists,

sdk/metric/periodic_reader.go

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,26 @@ import (
1818

1919
// Default periodic reader timing.
2020
const (
21-
defaultTimeout = time.Millisecond * 30000
22-
defaultInterval = time.Millisecond * 60000
21+
defaultTimeout = time.Millisecond * 30000
22+
defaultInterval = time.Millisecond * 60000
23+
defaultShutdownWait = time.Duration(0)
2324
)
2425

2526
// periodicReaderConfig contains configuration options for a PeriodicReader.
2627
type periodicReaderConfig struct {
27-
interval time.Duration
28-
timeout time.Duration
29-
producers []Producer
28+
interval time.Duration
29+
timeout time.Duration
30+
shutdownWait time.Duration
31+
producers []Producer
3032
}
3133

3234
// newPeriodicReaderConfig returns a periodicReaderConfig configured with
3335
// options.
3436
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
3537
c := periodicReaderConfig{
36-
interval: envDuration(envInterval, defaultInterval),
37-
timeout: envDuration(envTimeout, defaultTimeout),
38+
interval: envDuration(envInterval, defaultInterval),
39+
timeout: envDuration(envTimeout, defaultTimeout),
40+
shutdownWait: envDuration(envShutdownWait, defaultShutdownWait),
3841
}
3942
for _, o := range options {
4043
c = o.applyPeriodic(c)
@@ -94,6 +97,24 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
9497
})
9598
}
9699

100+
// WithShutdownWait configures the wait time to export remainning metrics in shutdown for a
101+
// PeriodicReader.
102+
//
103+
// This option overrides any value set for the
104+
// OTEL_METRIC_EXPORT_SHUTDOWN_WAIT environment variable.
105+
//
106+
// If this option is not used or d is less than or equal to zero, 0 seconds
107+
// is used as the default.
108+
func WithShutdownWait(d time.Duration) PeriodicReaderOption {
109+
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
110+
if d <= 0 {
111+
return conf
112+
}
113+
conf.shutdownWait = d
114+
return conf
115+
})
116+
}
117+
97118
// NewPeriodicReader returns a Reader that collects and exports metric data to
98119
// the exporter at a defined interval. By default, the returned Reader will
99120
// collect and export data every 60 seconds, and will cancel any attempts that
@@ -107,12 +128,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
107128
conf := newPeriodicReaderConfig(options)
108129
ctx, cancel := context.WithCancel(context.Background())
109130
r := &PeriodicReader{
110-
interval: conf.interval,
111-
timeout: conf.timeout,
112-
exporter: exporter,
113-
flushCh: make(chan chan error),
114-
cancel: cancel,
115-
done: make(chan struct{}),
131+
interval: conf.interval,
132+
timeout: conf.timeout,
133+
shutdownWait: conf.shutdownWait,
134+
exporter: exporter,
135+
flushCh: make(chan chan error),
136+
cancel: cancel,
137+
done: make(chan struct{}),
116138
rmPool: sync.Pool{
117139
New: func() interface{} {
118140
return &metricdata.ResourceMetrics{}
@@ -138,10 +160,11 @@ type PeriodicReader struct {
138160
isShutdown bool
139161
externalProducers atomic.Value
140162

141-
interval time.Duration
142-
timeout time.Duration
143-
exporter Exporter
144-
flushCh chan chan error
163+
interval time.Duration
164+
timeout time.Duration
165+
shutdownWait time.Duration
166+
exporter Exporter
167+
flushCh chan chan error
145168

146169
done chan struct{}
147170
cancel context.CancelFunc
@@ -329,6 +352,9 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
329352
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
330353
err = r.collect(ctx, ph, m)
331354
if err == nil {
355+
if r.shutdownWait > 0 {
356+
time.Sleep(r.shutdownWait)
357+
}
332358
err = r.export(ctx, m)
333359
}
334360
r.rmPool.Put(m)

sdk/metric/periodic_reader_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,66 @@ func TestIntervalEnvAndOption(t *testing.T) {
139139
assert.Equal(t, want, got, "option should have precedence over env var")
140140
}
141141

142+
func TestWithShutdownWait(t *testing.T) {
143+
test := func(d time.Duration) time.Duration {
144+
opts := []PeriodicReaderOption{WithShutdownWait(d)}
145+
return newPeriodicReaderConfig(opts).shutdownWait
146+
}
147+
148+
assert.Equal(t, testDur, test(testDur))
149+
assert.Equal(t, defaultShutdownWait, newPeriodicReaderConfig(nil).shutdownWait)
150+
assert.Equal(t, defaultShutdownWait, test(time.Duration(0)), "invalid shutdownWait should use default")
151+
assert.Equal(t, defaultShutdownWait, test(time.Duration(-1)), "invalid shutdownWait should use default")
152+
}
153+
154+
func TestShutdownWaitEnvVar(t *testing.T) {
155+
testCases := []struct {
156+
v string
157+
want time.Duration
158+
}{
159+
{
160+
// empty value
161+
"",
162+
defaultShutdownWait,
163+
},
164+
{
165+
// positive value
166+
"1",
167+
time.Millisecond,
168+
},
169+
{
170+
// non-positive value
171+
"0",
172+
defaultShutdownWait,
173+
},
174+
{
175+
// value with unit (not supported)
176+
"1ms",
177+
defaultShutdownWait,
178+
},
179+
{
180+
// NaN
181+
"abc",
182+
defaultShutdownWait,
183+
},
184+
}
185+
for _, tc := range testCases {
186+
t.Run(tc.v, func(t *testing.T) {
187+
t.Setenv(envShutdownWait, tc.v)
188+
got := newPeriodicReaderConfig(nil).shutdownWait
189+
assert.Equal(t, tc.want, got)
190+
})
191+
}
192+
}
193+
194+
func TestShutdownWaitEnvAndOption(t *testing.T) {
195+
want := 5 * time.Millisecond
196+
t.Setenv(envShutdownWait, "999")
197+
opts := []PeriodicReaderOption{WithShutdownWait(want)}
198+
got := newPeriodicReaderConfig(opts).shutdownWait
199+
assert.Equal(t, want, got, "option should have precedence over env var")
200+
}
201+
142202
type fnExporter struct {
143203
temporalityFunc TemporalitySelector
144204
aggregationFunc AggregationSelector
@@ -370,6 +430,29 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
370430
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
371431
})
372432

433+
t.Run("Shutdown with wait", func(t *testing.T) {
434+
shutdownWait := 1 * time.Millisecond
435+
expFunc := func(t *testing.T) (exp Exporter, called *bool, calledTime *time.Time) {
436+
called = new(bool)
437+
calledTime = new(time.Time)
438+
return &fnExporter{
439+
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
440+
// The testSDKProducer produces testResourceMetricsA.
441+
assert.Equal(t, testResourceMetricsAB, *m)
442+
*called = true
443+
*calledTime = time.Now()
444+
return assert.AnError
445+
},
446+
}, called, calledTime
447+
}
448+
exp, called, calledTime := expFunc(t)
449+
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithShutdownWait(shutdownWait))
450+
r.register(testSDKProducer{})
451+
now := time.Now()
452+
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
453+
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
454+
assert.GreaterOrEqual(t, calledTime.Sub(now), shutdownWait, "shutdown with wait not waited")
455+
})
373456
t.Run("Shutdown timeout on producer", func(t *testing.T) {
374457
exp, called := expFunc(t)
375458
timeout := time.Millisecond

0 commit comments

Comments
 (0)