Skip to content

Commit 2b58a15

Browse files
authored
[pkg/stanza] Ensure time parsing happens before entry is sent downwards (#36213)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This issue was caught at #35758. This PR ensures that time parsing happens before the entry is sent to the next operator in the pipeline. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes ~ <!--Describe what testing was performed and which tests were added.--> #### Testing Added <!--Describe the documentation added.--> #### Documentation ~ <!--Please delete paragraphs that you did not use before submitting.--> Signed-off-by: ChrsMark <[email protected]>
1 parent 5032867 commit 2b58a15

File tree

3 files changed

+104
-19
lines changed

3 files changed

+104
-19
lines changed
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: pkg/stanza
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Ensure that time parsing happens before entry is sent to downstream operators
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36213]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

pkg/stanza/operator/parser/container/parser.go

+14-19
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,11 @@ type Parser struct {
6363
asyncConsumerStarted bool
6464
criConsumerStartOnce sync.Once
6565
criConsumers *sync.WaitGroup
66+
timeLayout string
6667
}
6768

6869
// Process will parse an entry of Container logs
6970
func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
70-
var timeLayout string
71-
7271
format := p.format
7372
if format == "" {
7473
format, err = p.detectFormat(entry)
@@ -79,15 +78,11 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
7978

8079
switch format {
8180
case dockerFormat:
82-
err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleAttributeMappings)
81+
p.timeLayout = goTimeLayout
82+
err = p.ParserOperator.ProcessWithCallback(ctx, entry, p.parseDocker, p.handleTimeAndAttributeMappings)
8383
if err != nil {
8484
return fmt.Errorf("failed to process the docker log: %w", err)
8585
}
86-
timeLayout = goTimeLayout
87-
err = parseTime(entry, timeLayout)
88-
if err != nil {
89-
return fmt.Errorf("failed to parse time: %w", err)
90-
}
9186
case containerdFormat, crioFormat:
9287
p.criConsumerStartOnce.Do(func() {
9388
err = p.criLogEmitter.Start(nil)
@@ -119,22 +114,17 @@ func (p *Parser) Process(ctx context.Context, entry *entry.Entry) (err error) {
119114
if err != nil {
120115
return fmt.Errorf("failed to parse containerd log: %w", err)
121116
}
122-
timeLayout = goTimeLayout
117+
p.timeLayout = goTimeLayout
123118
} else {
124119
// parse the message
125120
err = p.ParserOperator.ParseWith(ctx, entry, p.parseCRIO)
126121
if err != nil {
127122
return fmt.Errorf("failed to parse crio log: %w", err)
128123
}
129-
timeLayout = crioTimeLayout
124+
p.timeLayout = crioTimeLayout
130125
}
131126

132-
err = parseTime(entry, timeLayout)
133-
if err != nil {
134-
return fmt.Errorf("failed to parse time: %w", err)
135-
}
136-
137-
err = p.handleAttributeMappings(entry)
127+
err = p.handleTimeAndAttributeMappings(entry)
138128
if err != nil {
139129
return fmt.Errorf("failed to handle attribute mappings: %w", err)
140130
}
@@ -251,9 +241,14 @@ func (p *Parser) parseDocker(value any) (any, error) {
251241
return parsedValue, nil
252242
}
253243

254-
// handleAttributeMappings handles fields' mappings and k8s meta extraction
255-
func (p *Parser) handleAttributeMappings(e *entry.Entry) error {
256-
err := p.handleMoveAttributes(e)
244+
// handleTimeAndAttributeMappings handles fields' mappings and k8s meta extraction
245+
func (p *Parser) handleTimeAndAttributeMappings(e *entry.Entry) error {
246+
err := parseTime(e, p.timeLayout)
247+
if err != nil {
248+
return fmt.Errorf("failed to parse time: %w", err)
249+
}
250+
251+
err = p.handleMoveAttributes(e)
257252
if err != nil {
258253
return err
259254
}

pkg/stanza/operator/parser/container/parser_test.go

+63
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,69 @@ func TestRecombineProcess(t *testing.T) {
408408
}
409409
}
410410

411+
func TestProcessWithDockerTime(t *testing.T) {
412+
cases := []struct {
413+
name string
414+
op func() (operator.Operator, error)
415+
input *entry.Entry
416+
expectedOutput *entry.Entry
417+
}{
418+
{
419+
"docker",
420+
func() (operator.Operator, error) {
421+
cfg := NewConfigWithID("test_id")
422+
cfg.AddMetadataFromFilePath = true
423+
set := componenttest.NewNopTelemetrySettings()
424+
return cfg.Build(set)
425+
},
426+
&entry.Entry{
427+
Body: `{"log":"INFO: log line here","stream":"stdout","time":"2029-03-30T08:31:20.545192187Z"}`,
428+
Attributes: map[string]any{
429+
"log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
430+
},
431+
},
432+
&entry.Entry{
433+
Attributes: map[string]any{
434+
"log.iostream": "stdout",
435+
"log.file.path": "/var/log/pods/some_kube-scheduler-kind-control-plane_49cc7c1fd3702c40b2686ea7486091d3/kube-scheduler44/1.log",
436+
},
437+
Body: "INFO: log line here",
438+
Resource: map[string]any{
439+
"k8s.pod.name": "kube-scheduler-kind-control-plane",
440+
"k8s.pod.uid": "49cc7c1fd3702c40b2686ea7486091d3",
441+
"k8s.container.name": "kube-scheduler44",
442+
"k8s.container.restart_count": "1",
443+
"k8s.namespace.name": "some",
444+
},
445+
Timestamp: time.Date(2029, time.March, 30, 8, 31, 20, 545192187, time.UTC),
446+
},
447+
},
448+
}
449+
450+
for _, tc := range cases {
451+
t.Run(tc.name, func(t *testing.T) {
452+
ctx := context.Background()
453+
op, err := tc.op()
454+
require.NoError(t, err)
455+
defer func() { require.NoError(t, op.Stop()) }()
456+
r := op.(*Parser)
457+
458+
fake := testutil.NewFakeOutput(t)
459+
r.OutputOperators = ([]operator.Operator{fake})
460+
461+
require.NoError(t, r.Process(ctx, tc.input))
462+
463+
fake.ExpectEntry(t, tc.expectedOutput)
464+
465+
select {
466+
case e := <-fake.Received:
467+
require.FailNow(t, "Received unexpected entry: ", e)
468+
default:
469+
}
470+
})
471+
}
472+
}
473+
411474
func TestCRIRecombineProcessWithFailedDownstreamOperator(t *testing.T) {
412475
cases := []struct {
413476
name string

0 commit comments

Comments
 (0)