Skip to content

add WithShutdownWait option to PeriodicReader #6678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/metric/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const (
envInterval = "OTEL_METRIC_EXPORT_INTERVAL"
// Maximum allowed time (in milliseconds) to export data.
envTimeout = "OTEL_METRIC_EXPORT_TIMEOUT"
// The time (in milliseconds) to wait export remaining metrics in shutdown.
envShutdownWait = "OTEL_METRIC_EXPORT_SHUTDOWN_WAIT"
)

// envDuration returns an environment variable's value as duration in milliseconds if it is exists,
Expand Down
60 changes: 43 additions & 17 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ import (

// Default periodic reader timing.
const (
defaultTimeout = time.Millisecond * 30000
defaultInterval = time.Millisecond * 60000
defaultTimeout = time.Millisecond * 30000
defaultInterval = time.Millisecond * 60000
defaultShutdownWait = time.Duration(0)
)

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
producers []Producer
interval time.Duration
timeout time.Duration
shutdownWait time.Duration
producers []Producer
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
interval: envDuration(envInterval, defaultInterval),
timeout: envDuration(envTimeout, defaultTimeout),
shutdownWait: envDuration(envShutdownWait, defaultShutdownWait),
}
for _, o := range options {
c = o.applyPeriodic(c)
Expand Down Expand Up @@ -94,6 +97,24 @@ func WithInterval(d time.Duration) PeriodicReaderOption {
})
}

// WithShutdownWait configures the wait time to export remainning metrics in shutdown for a
// PeriodicReader.
//
// This option overrides any value set for the
// OTEL_METRIC_EXPORT_SHUTDOWN_WAIT environment variable.
//
// If this option is not used or d is less than or equal to zero, 0 seconds
// is used as the default.
func WithShutdownWait(d time.Duration) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if d <= 0 {
return conf
}
conf.shutdownWait = d
return conf
})
}

// NewPeriodicReader returns a Reader that collects and exports metric data to
// the exporter at a defined interval. By default, the returned Reader will
// collect and export data every 60 seconds, and will cancel any attempts that
Expand All @@ -107,12 +128,13 @@ func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *Peri
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &PeriodicReader{
interval: conf.interval,
timeout: conf.timeout,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
interval: conf.interval,
timeout: conf.timeout,
shutdownWait: conf.shutdownWait,
exporter: exporter,
flushCh: make(chan chan error),
cancel: cancel,
done: make(chan struct{}),
rmPool: sync.Pool{
New: func() interface{} {
return &metricdata.ResourceMetrics{}
Expand All @@ -138,10 +160,11 @@ type PeriodicReader struct {
isShutdown bool
externalProducers atomic.Value

interval time.Duration
timeout time.Duration
exporter Exporter
flushCh chan chan error
interval time.Duration
timeout time.Duration
shutdownWait time.Duration
exporter Exporter
flushCh chan chan error

done chan struct{}
cancel context.CancelFunc
Expand Down Expand Up @@ -329,6 +352,9 @@ func (r *PeriodicReader) Shutdown(ctx context.Context) error {
m := r.rmPool.Get().(*metricdata.ResourceMetrics)
err = r.collect(ctx, ph, m)
if err == nil {
if r.shutdownWait > 0 {
time.Sleep(r.shutdownWait)
}
err = r.export(ctx, m)
}
r.rmPool.Put(m)
Expand Down
83 changes: 83 additions & 0 deletions sdk/metric/periodic_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,66 @@ func TestIntervalEnvAndOption(t *testing.T) {
assert.Equal(t, want, got, "option should have precedence over env var")
}

func TestWithShutdownWait(t *testing.T) {
test := func(d time.Duration) time.Duration {
opts := []PeriodicReaderOption{WithShutdownWait(d)}
return newPeriodicReaderConfig(opts).shutdownWait
}

assert.Equal(t, testDur, test(testDur))
assert.Equal(t, defaultShutdownWait, newPeriodicReaderConfig(nil).shutdownWait)
assert.Equal(t, defaultShutdownWait, test(time.Duration(0)), "invalid shutdownWait should use default")
assert.Equal(t, defaultShutdownWait, test(time.Duration(-1)), "invalid shutdownWait should use default")
}

func TestShutdownWaitEnvVar(t *testing.T) {
testCases := []struct {
v string
want time.Duration
}{
{
// empty value
"",
defaultShutdownWait,
},
{
// positive value
"1",
time.Millisecond,
},
{
// non-positive value
"0",
defaultShutdownWait,
},
{
// value with unit (not supported)
"1ms",
defaultShutdownWait,
},
{
// NaN
"abc",
defaultShutdownWait,
},
}
for _, tc := range testCases {
t.Run(tc.v, func(t *testing.T) {
t.Setenv(envShutdownWait, tc.v)
got := newPeriodicReaderConfig(nil).shutdownWait
assert.Equal(t, tc.want, got)
})
}
}

func TestShutdownWaitEnvAndOption(t *testing.T) {
want := 5 * time.Millisecond
t.Setenv(envShutdownWait, "999")
opts := []PeriodicReaderOption{WithShutdownWait(want)}
got := newPeriodicReaderConfig(opts).shutdownWait
assert.Equal(t, want, got, "option should have precedence over env var")
}

type fnExporter struct {
temporalityFunc TemporalitySelector
aggregationFunc AggregationSelector
Expand Down Expand Up @@ -370,6 +430,29 @@ func TestPeriodicReaderFlushesPending(t *testing.T) {
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
})

t.Run("Shutdown with wait", func(t *testing.T) {
shutdownWait := 1 * time.Millisecond
expFunc := func(t *testing.T) (exp Exporter, called *bool, calledTime *time.Time) {
called = new(bool)
calledTime = new(time.Time)
return &fnExporter{
exportFunc: func(_ context.Context, m *metricdata.ResourceMetrics) error {
// The testSDKProducer produces testResourceMetricsA.
assert.Equal(t, testResourceMetricsAB, *m)
*called = true
*calledTime = time.Now()
return assert.AnError
},
}, called, calledTime
}
exp, called, calledTime := expFunc(t)
r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithShutdownWait(shutdownWait))
r.register(testSDKProducer{})
now := time.Now()
assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned")
assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed")
assert.GreaterOrEqual(t, calledTime.Sub(now), shutdownWait, "shutdown with wait not waited")
})
t.Run("Shutdown timeout on producer", func(t *testing.T) {
exp, called := expFunc(t)
timeout := time.Millisecond
Expand Down
Loading