diff --git a/.chloggen/fileexporter_lifecycle.yaml b/.chloggen/fileexporter_lifecycle.yaml new file mode 100755 index 000000000000..5eb92b7b5bb6 --- /dev/null +++ b/.chloggen/fileexporter_lifecycle.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: fileexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Scope the behavior of the fileexporter to its lifecycle, so it is safe to shut it down or restart it. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [27489] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/fileexporter/error_component.go b/exporter/fileexporter/error_component.go deleted file mode 100644 index 16d4675f6ed5..000000000000 --- a/exporter/fileexporter/error_component.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" - -import ( - "context" - - "go.opentelemetry.io/collector/component" -) - -// errorComponent is used to return error from a factory method. SharedComponent does -// not handle errors, so wrapping the error into a component is necessary. -type errorComponent struct { - err error -} - -// Start will return the cached error. -func (e *errorComponent) Start(context.Context, component.Host) error { - return e.err -} - -// Shutdown will return the cached error. -func (e *errorComponent) Shutdown(context.Context) error { - return e.err -} diff --git a/exporter/fileexporter/factory.go b/exporter/fileexporter/factory.go index 5f2f29e29ab6..b75a0c0eb528 100644 --- a/exporter/fileexporter/factory.go +++ b/exporter/fileexporter/factory.go @@ -63,10 +63,7 @@ func createTracesExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Traces, error) { - fe, err := getOrCreateFileExporter(cfg) - if err != nil { - return nil, err - } + fe := getOrCreateFileExporter(cfg) return exporterhelper.NewTracesExporter( ctx, set, @@ -83,10 +80,7 @@ func createMetricsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Metrics, error) { - fe, err := getOrCreateFileExporter(cfg) - if err != nil { - return nil, err - } + fe := getOrCreateFileExporter(cfg) return exporterhelper.NewMetricsExporter( ctx, set, @@ -103,10 +97,7 @@ func createLogsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Logs, error) { - fe, err := getOrCreateFileExporter(cfg) - if err != nil { - return nil, err - } + fe := getOrCreateFileExporter(cfg) return exporterhelper.NewLogsExporter( ctx, set, @@ -122,45 +113,20 @@ func createLogsExporter( // or returns the already cached one. Caching is required because the factory is asked trace and // metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver() // but they must not create separate objects, they must use one Exporter object per configuration. -func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) { +func getOrCreateFileExporter(cfg component.Config) FileExporter { conf := cfg.(*Config) fe := exporters.GetOrAdd(cfg, func() component.Component { - e, err := newFileExporter(conf) - if err != nil { - return &errorComponent{err: err} - } - - return e + return newFileExporter(conf) }) - component := fe.Unwrap() - if errComponent, ok := component.(*errorComponent); ok { - return nil, errComponent.err - } - - return component.(FileExporter), nil + c := fe.Unwrap() + return c.(FileExporter) } -func newFileExporter(conf *Config) (FileExporter, error) { - marshaller := &marshaller{ - formatType: conf.FormatType, - tracesMarshaler: tracesMarshalers[conf.FormatType], - metricsMarshaler: metricsMarshalers[conf.FormatType], - logsMarshaler: logsMarshalers[conf.FormatType], - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - } - export := buildExportFunc(conf) - - writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export) - if err != nil { - return nil, err - } - +func newFileExporter(conf *Config) FileExporter { return &fileExporter{ - marshaller: marshaller, - writer: writer, - }, nil + conf: conf, + } } func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) { diff --git a/exporter/fileexporter/factory_test.go b/exporter/fileexporter/factory_test.go index 9d155e653d34..1f8a99c79163 100644 --- a/exporter/fileexporter/factory_test.go +++ b/exporter/fileexporter/factory_test.go @@ -26,10 +26,12 @@ func TestCreateMetricsExporterError(t *testing.T) { cfg := &Config{ FormatType: formatTypeJSON, } - _, err := createMetricsExporter( + e, err := createMetricsExporter( context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = e.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) } @@ -65,10 +67,12 @@ func TestCreateTracesExporterError(t *testing.T) { cfg := &Config{ FormatType: formatTypeJSON, } - _, err := createTracesExporter( + e, err := createTracesExporter( context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = e.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) } @@ -90,10 +94,12 @@ func TestCreateLogsExporterError(t *testing.T) { cfg := &Config{ FormatType: formatTypeJSON, } - _, err := createLogsExporter( + e, err := createLogsExporter( context.Background(), exportertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + err = e.Start(context.Background(), componenttest.NewNopHost()) assert.Error(t, err) } diff --git a/exporter/fileexporter/file_exporter.go b/exporter/fileexporter/file_exporter.go index c610c9fa6445..18cf7aa58637 100644 --- a/exporter/fileexporter/file_exporter.go +++ b/exporter/fileexporter/file_exporter.go @@ -14,6 +14,7 @@ import ( // fileExporter is the implementation of file exporter that writes telemetry data to a file type fileExporter struct { + conf *Config marshaller *marshaller writer *fileWriter } @@ -43,12 +44,33 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error { } // Start starts the flush timer if set. -func (e *fileExporter) Start(ctx context.Context, _ component.Host) error { - return e.writer.start(ctx) +func (e *fileExporter) Start(_ context.Context, _ component.Host) error { + e.marshaller = &marshaller{ + formatType: e.conf.FormatType, + tracesMarshaler: tracesMarshalers[e.conf.FormatType], + metricsMarshaler: metricsMarshalers[e.conf.FormatType], + logsMarshaler: logsMarshalers[e.conf.FormatType], + compression: e.conf.Compression, + compressor: buildCompressor(e.conf.Compression), + } + export := buildExportFunc(e.conf) + + var err error + e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export) + if err != nil { + return err + } + e.writer.start() + return nil } // Shutdown stops the exporter and is invoked during shutdown. // It stops the flush ticker if set. func (e *fileExporter) Shutdown(context.Context) error { - return e.writer.shutdown() + if e.writer == nil { + return nil + } + w := e.writer + e.writer = nil + return w.shutdown() } diff --git a/exporter/fileexporter/file_exporter_test.go b/exporter/fileexporter/file_exporter_test.go index 7b1747df7a0b..5cff81001f62 100644 --- a/exporter/fileexporter/file_exporter_test.go +++ b/exporter/fileexporter/file_exporter_test.go @@ -126,8 +126,7 @@ func TestFileTracesExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { conf := tt.args.conf - feI, err := newFileExporter(conf) - assert.NoError(t, err) + feI := newFileExporter(conf) require.IsType(t, &fileExporter{}, feI) fe := feI.(*fileExporter) @@ -135,7 +134,9 @@ func TestFileTracesExporter(t *testing.T) { assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, fe.consumeTraces(context.Background(), td)) assert.NoError(t, fe.consumeTraces(context.Background(), td)) - assert.NoError(t, fe.Shutdown(context.Background())) + defer func() { + assert.NoError(t, fe.Shutdown(context.Background())) + }() fi, err := os.Open(fe.writer.path) assert.NoError(t, err) @@ -256,16 +257,8 @@ func TestFileMetricsExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { conf := tt.args.conf - writer, err := buildFileWriter(conf) - assert.NoError(t, err) fe := &fileExporter{ - marshaller: &marshaller{ - formatType: conf.FormatType, - metricsMarshaler: metricsMarshalers[conf.FormatType], - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - }, - writer: writer, + conf: conf, } require.NotNil(t, fe) @@ -273,7 +266,9 @@ func TestFileMetricsExporter(t *testing.T) { assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, fe.consumeMetrics(context.Background(), md)) assert.NoError(t, fe.consumeMetrics(context.Background(), md)) - assert.NoError(t, fe.Shutdown(context.Background())) + defer func() { + assert.NoError(t, fe.Shutdown(context.Background())) + }() fi, err := os.Open(fe.writer.path) assert.NoError(t, err) @@ -396,16 +391,8 @@ func TestFileLogsExporter(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { conf := tt.args.conf - writer, err := buildFileWriter(conf) - assert.NoError(t, err) fe := &fileExporter{ - marshaller: &marshaller{ - formatType: conf.FormatType, - logsMarshaler: logsMarshalers[conf.FormatType], - compression: conf.Compression, - compressor: buildCompressor(conf.Compression), - }, - writer: writer, + conf: conf, } require.NotNil(t, fe) @@ -413,7 +400,9 @@ func TestFileLogsExporter(t *testing.T) { assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost())) assert.NoError(t, fe.consumeLogs(context.Background(), ld)) assert.NoError(t, fe.consumeLogs(context.Background(), ld)) - assert.NoError(t, fe.Shutdown(context.Background())) + defer func() { + assert.NoError(t, fe.Shutdown(context.Background())) + }() fi, err := os.Open(fe.writer.path) assert.NoError(t, err) @@ -631,11 +620,6 @@ func safeFileExporterWrite(e *fileExporter, d []byte) (int, error) { return e.writer.file.Write(d) } -func buildFileWriter(conf *Config) (*fileWriter, error) { - export := buildExportFunc(conf) - return newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export) -} - func TestFlushing(t *testing.T) { cfg := &Config{ Path: tempFileName(t), @@ -648,16 +632,28 @@ func TestFlushing(t *testing.T) { // Wrap the buffer with the buffered writer closer that implements flush() method. bwc := newBufferedWriteCloser(buf) // Create a file exporter with flushing enabled. - feI, err := newFileExporter(cfg) - assert.NoError(t, err) + feI := newFileExporter(cfg) assert.IsType(t, &fileExporter{}, feI) fe := feI.(*fileExporter) - fe.writer.file.Close() - fe.writer.file = bwc // Start the flusher. ctx := context.Background() - assert.NoError(t, fe.Start(ctx, nil)) + fe.marshaller = &marshaller{ + formatType: fe.conf.FormatType, + tracesMarshaler: tracesMarshalers[fe.conf.FormatType], + metricsMarshaler: metricsMarshalers[fe.conf.FormatType], + logsMarshaler: logsMarshalers[fe.conf.FormatType], + compression: fe.conf.Compression, + compressor: buildCompressor(fe.conf.Compression), + } + export := buildExportFunc(fe.conf) + var err error + fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export) + assert.NoError(t, err) + err = fe.writer.file.Close() + assert.NoError(t, err) + fe.writer.file = bwc + fe.writer.start() // Write 10 bytes. b := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} diff --git a/exporter/fileexporter/file_writer.go b/exporter/fileexporter/file_writer.go index 46dde6d82ed6..fc4c34e25425 100644 --- a/exporter/fileexporter/file_writer.go +++ b/exporter/fileexporter/file_writer.go @@ -4,7 +4,6 @@ package fileexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/fileexporter" import ( - "context" "encoding/binary" "io" "sync" @@ -78,6 +77,8 @@ func (w *fileWriter) startFlusher() { ff.flush() w.mutex.Unlock() case <-w.stopTicker: + w.flushTicker.Stop() + w.flushTicker = nil return } } @@ -85,23 +86,22 @@ func (w *fileWriter) startFlusher() { } // Start starts the flush timer if set. -func (w *fileWriter) start(context.Context) error { +func (w *fileWriter) start() { if w.flushInterval > 0 { w.startFlusher() } - return nil } // Shutdown stops the exporter and is invoked during shutdown. // It stops the flush ticker if set. func (w *fileWriter) shutdown() error { - w.mutex.Lock() - defer w.mutex.Unlock() + // Stop the flush ticker. if w.flushTicker != nil { - w.flushTicker.Stop() // Stop the go routine. + w.mutex.Lock() close(w.stopTicker) + w.mutex.Unlock() } return w.file.Close() } diff --git a/exporter/fileexporter/testdata/.gitignore b/exporter/fileexporter/testdata/.gitignore new file mode 100644 index 000000000000..71aba6f8f5a7 --- /dev/null +++ b/exporter/fileexporter/testdata/.gitignore @@ -0,0 +1,2 @@ +# File generated by lifecycle tests +log.json \ No newline at end of file