Skip to content

Commit 89c6207

Browse files
authored
[exporter/file] Refactor to keep the file exporter inside start/stop lifecycle (#31495)
**Description:** Scope the behavior of the fileexporter to its lifecycle, so it is safe to shut it down or restart it. **Link to tracking Issue:** Fixes #31494
1 parent ce6e401 commit 89c6207

File tree

8 files changed

+108
-115
lines changed

8 files changed

+108
-115
lines changed

.chloggen/fileexporter_lifecycle.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: fileexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Scope the behavior of the fileexporter to its lifecycle, so it is safe to shut it down or restart it.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [27489]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

exporter/fileexporter/error_component.go

-26
This file was deleted.

exporter/fileexporter/factory.go

+10-44
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,7 @@ func createTracesExporter(
6363
set exporter.CreateSettings,
6464
cfg component.Config,
6565
) (exporter.Traces, error) {
66-
fe, err := getOrCreateFileExporter(cfg)
67-
if err != nil {
68-
return nil, err
69-
}
66+
fe := getOrCreateFileExporter(cfg)
7067
return exporterhelper.NewTracesExporter(
7168
ctx,
7269
set,
@@ -83,10 +80,7 @@ func createMetricsExporter(
8380
set exporter.CreateSettings,
8481
cfg component.Config,
8582
) (exporter.Metrics, error) {
86-
fe, err := getOrCreateFileExporter(cfg)
87-
if err != nil {
88-
return nil, err
89-
}
83+
fe := getOrCreateFileExporter(cfg)
9084
return exporterhelper.NewMetricsExporter(
9185
ctx,
9286
set,
@@ -103,10 +97,7 @@ func createLogsExporter(
10397
set exporter.CreateSettings,
10498
cfg component.Config,
10599
) (exporter.Logs, error) {
106-
fe, err := getOrCreateFileExporter(cfg)
107-
if err != nil {
108-
return nil, err
109-
}
100+
fe := getOrCreateFileExporter(cfg)
110101
return exporterhelper.NewLogsExporter(
111102
ctx,
112103
set,
@@ -122,45 +113,20 @@ func createLogsExporter(
122113
// or returns the already cached one. Caching is required because the factory is asked trace and
123114
// metric receivers separately when it gets CreateTracesReceiver() and CreateMetricsReceiver()
124115
// but they must not create separate objects, they must use one Exporter object per configuration.
125-
func getOrCreateFileExporter(cfg component.Config) (FileExporter, error) {
116+
func getOrCreateFileExporter(cfg component.Config) FileExporter {
126117
conf := cfg.(*Config)
127118
fe := exporters.GetOrAdd(cfg, func() component.Component {
128-
e, err := newFileExporter(conf)
129-
if err != nil {
130-
return &errorComponent{err: err}
131-
}
132-
133-
return e
119+
return newFileExporter(conf)
134120
})
135121

136-
component := fe.Unwrap()
137-
if errComponent, ok := component.(*errorComponent); ok {
138-
return nil, errComponent.err
139-
}
140-
141-
return component.(FileExporter), nil
122+
c := fe.Unwrap()
123+
return c.(FileExporter)
142124
}
143125

144-
func newFileExporter(conf *Config) (FileExporter, error) {
145-
marshaller := &marshaller{
146-
formatType: conf.FormatType,
147-
tracesMarshaler: tracesMarshalers[conf.FormatType],
148-
metricsMarshaler: metricsMarshalers[conf.FormatType],
149-
logsMarshaler: logsMarshalers[conf.FormatType],
150-
compression: conf.Compression,
151-
compressor: buildCompressor(conf.Compression),
152-
}
153-
export := buildExportFunc(conf)
154-
155-
writer, err := newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
156-
if err != nil {
157-
return nil, err
158-
}
159-
126+
func newFileExporter(conf *Config) FileExporter {
160127
return &fileExporter{
161-
marshaller: marshaller,
162-
writer: writer,
163-
}, nil
128+
conf: conf,
129+
}
164130
}
165131

166132
func newFileWriter(path string, rotation *Rotation, flushInterval time.Duration, export exportFunc) (*fileWriter, error) {

exporter/fileexporter/factory_test.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ func TestCreateMetricsExporterError(t *testing.T) {
2626
cfg := &Config{
2727
FormatType: formatTypeJSON,
2828
}
29-
_, err := createMetricsExporter(
29+
e, err := createMetricsExporter(
3030
context.Background(),
3131
exportertest.NewNopCreateSettings(),
3232
cfg)
33+
require.NoError(t, err)
34+
err = e.Start(context.Background(), componenttest.NewNopHost())
3335
assert.Error(t, err)
3436
}
3537

@@ -65,10 +67,12 @@ func TestCreateTracesExporterError(t *testing.T) {
6567
cfg := &Config{
6668
FormatType: formatTypeJSON,
6769
}
68-
_, err := createTracesExporter(
70+
e, err := createTracesExporter(
6971
context.Background(),
7072
exportertest.NewNopCreateSettings(),
7173
cfg)
74+
require.NoError(t, err)
75+
err = e.Start(context.Background(), componenttest.NewNopHost())
7276
assert.Error(t, err)
7377
}
7478

@@ -90,10 +94,12 @@ func TestCreateLogsExporterError(t *testing.T) {
9094
cfg := &Config{
9195
FormatType: formatTypeJSON,
9296
}
93-
_, err := createLogsExporter(
97+
e, err := createLogsExporter(
9498
context.Background(),
9599
exportertest.NewNopCreateSettings(),
96100
cfg)
101+
require.NoError(t, err)
102+
err = e.Start(context.Background(), componenttest.NewNopHost())
97103
assert.Error(t, err)
98104
}
99105

exporter/fileexporter/file_exporter.go

+25-3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
// fileExporter is the implementation of file exporter that writes telemetry data to a file
1616
type fileExporter struct {
17+
conf *Config
1718
marshaller *marshaller
1819
writer *fileWriter
1920
}
@@ -43,12 +44,33 @@ func (e *fileExporter) consumeLogs(_ context.Context, ld plog.Logs) error {
4344
}
4445

4546
// Start starts the flush timer if set.
46-
func (e *fileExporter) Start(ctx context.Context, _ component.Host) error {
47-
return e.writer.start(ctx)
47+
func (e *fileExporter) Start(_ context.Context, _ component.Host) error {
48+
e.marshaller = &marshaller{
49+
formatType: e.conf.FormatType,
50+
tracesMarshaler: tracesMarshalers[e.conf.FormatType],
51+
metricsMarshaler: metricsMarshalers[e.conf.FormatType],
52+
logsMarshaler: logsMarshalers[e.conf.FormatType],
53+
compression: e.conf.Compression,
54+
compressor: buildCompressor(e.conf.Compression),
55+
}
56+
export := buildExportFunc(e.conf)
57+
58+
var err error
59+
e.writer, err = newFileWriter(e.conf.Path, e.conf.Rotation, e.conf.FlushInterval, export)
60+
if err != nil {
61+
return err
62+
}
63+
e.writer.start()
64+
return nil
4865
}
4966

5067
// Shutdown stops the exporter and is invoked during shutdown.
5168
// It stops the flush ticker if set.
5269
func (e *fileExporter) Shutdown(context.Context) error {
53-
return e.writer.shutdown()
70+
if e.writer == nil {
71+
return nil
72+
}
73+
w := e.writer
74+
e.writer = nil
75+
return w.shutdown()
5476
}

exporter/fileexporter/file_exporter_test.go

+29-33
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,17 @@ func TestFileTracesExporter(t *testing.T) {
126126
for _, tt := range tests {
127127
t.Run(tt.name, func(t *testing.T) {
128128
conf := tt.args.conf
129-
feI, err := newFileExporter(conf)
130-
assert.NoError(t, err)
129+
feI := newFileExporter(conf)
131130
require.IsType(t, &fileExporter{}, feI)
132131
fe := feI.(*fileExporter)
133132

134133
td := testdata.GenerateTracesTwoSpansSameResource()
135134
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
136135
assert.NoError(t, fe.consumeTraces(context.Background(), td))
137136
assert.NoError(t, fe.consumeTraces(context.Background(), td))
138-
assert.NoError(t, fe.Shutdown(context.Background()))
137+
defer func() {
138+
assert.NoError(t, fe.Shutdown(context.Background()))
139+
}()
139140

140141
fi, err := os.Open(fe.writer.path)
141142
assert.NoError(t, err)
@@ -256,24 +257,18 @@ func TestFileMetricsExporter(t *testing.T) {
256257
for _, tt := range tests {
257258
t.Run(tt.name, func(t *testing.T) {
258259
conf := tt.args.conf
259-
writer, err := buildFileWriter(conf)
260-
assert.NoError(t, err)
261260
fe := &fileExporter{
262-
marshaller: &marshaller{
263-
formatType: conf.FormatType,
264-
metricsMarshaler: metricsMarshalers[conf.FormatType],
265-
compression: conf.Compression,
266-
compressor: buildCompressor(conf.Compression),
267-
},
268-
writer: writer,
261+
conf: conf,
269262
}
270263
require.NotNil(t, fe)
271264

272265
md := testdata.GenerateMetricsTwoMetrics()
273266
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
274267
assert.NoError(t, fe.consumeMetrics(context.Background(), md))
275268
assert.NoError(t, fe.consumeMetrics(context.Background(), md))
276-
assert.NoError(t, fe.Shutdown(context.Background()))
269+
defer func() {
270+
assert.NoError(t, fe.Shutdown(context.Background()))
271+
}()
277272

278273
fi, err := os.Open(fe.writer.path)
279274
assert.NoError(t, err)
@@ -396,24 +391,18 @@ func TestFileLogsExporter(t *testing.T) {
396391
for _, tt := range tests {
397392
t.Run(tt.name, func(t *testing.T) {
398393
conf := tt.args.conf
399-
writer, err := buildFileWriter(conf)
400-
assert.NoError(t, err)
401394
fe := &fileExporter{
402-
marshaller: &marshaller{
403-
formatType: conf.FormatType,
404-
logsMarshaler: logsMarshalers[conf.FormatType],
405-
compression: conf.Compression,
406-
compressor: buildCompressor(conf.Compression),
407-
},
408-
writer: writer,
395+
conf: conf,
409396
}
410397
require.NotNil(t, fe)
411398

412399
ld := testdata.GenerateLogsTwoLogRecordsSameResource()
413400
assert.NoError(t, fe.Start(context.Background(), componenttest.NewNopHost()))
414401
assert.NoError(t, fe.consumeLogs(context.Background(), ld))
415402
assert.NoError(t, fe.consumeLogs(context.Background(), ld))
416-
assert.NoError(t, fe.Shutdown(context.Background()))
403+
defer func() {
404+
assert.NoError(t, fe.Shutdown(context.Background()))
405+
}()
417406

418407
fi, err := os.Open(fe.writer.path)
419408
assert.NoError(t, err)
@@ -631,11 +620,6 @@ func safeFileExporterWrite(e *fileExporter, d []byte) (int, error) {
631620
return e.writer.file.Write(d)
632621
}
633622

634-
func buildFileWriter(conf *Config) (*fileWriter, error) {
635-
export := buildExportFunc(conf)
636-
return newFileWriter(conf.Path, conf.Rotation, conf.FlushInterval, export)
637-
}
638-
639623
func TestFlushing(t *testing.T) {
640624
cfg := &Config{
641625
Path: tempFileName(t),
@@ -648,16 +632,28 @@ func TestFlushing(t *testing.T) {
648632
// Wrap the buffer with the buffered writer closer that implements flush() method.
649633
bwc := newBufferedWriteCloser(buf)
650634
// Create a file exporter with flushing enabled.
651-
feI, err := newFileExporter(cfg)
652-
assert.NoError(t, err)
635+
feI := newFileExporter(cfg)
653636
assert.IsType(t, &fileExporter{}, feI)
654637
fe := feI.(*fileExporter)
655-
fe.writer.file.Close()
656-
fe.writer.file = bwc
657638

658639
// Start the flusher.
659640
ctx := context.Background()
660-
assert.NoError(t, fe.Start(ctx, nil))
641+
fe.marshaller = &marshaller{
642+
formatType: fe.conf.FormatType,
643+
tracesMarshaler: tracesMarshalers[fe.conf.FormatType],
644+
metricsMarshaler: metricsMarshalers[fe.conf.FormatType],
645+
logsMarshaler: logsMarshalers[fe.conf.FormatType],
646+
compression: fe.conf.Compression,
647+
compressor: buildCompressor(fe.conf.Compression),
648+
}
649+
export := buildExportFunc(fe.conf)
650+
var err error
651+
fe.writer, err = newFileWriter(fe.conf.Path, fe.conf.Rotation, fe.conf.FlushInterval, export)
652+
assert.NoError(t, err)
653+
err = fe.writer.file.Close()
654+
assert.NoError(t, err)
655+
fe.writer.file = bwc
656+
fe.writer.start()
661657

662658
// Write 10 bytes.
663659
b := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}

0 commit comments

Comments
 (0)