Skip to content

Commit ccc3f95

Browse files
authored
Add performance test for filelog based kubernetes container logs receiver (#2564)
Add performance test for filelog based kubernetes container logs receiver This PR contains one performance test (for kubernetes containerd logs with format autodetection) **Link to tracking Issue:** #2266
1 parent d58b47e commit ccc3f95

File tree

3 files changed

+358
-1
lines changed

3 files changed

+358
-1
lines changed

internal/stanza/register.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@
1515
package stanza
1616

1717
import (
18-
// Register parsers for stanza-based log receivers
18+
// Register parsers and transformers for stanza-based log receivers
1919
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/json"
2020
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/regex"
2121
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/severity"
2222
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/parser/time"
23+
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/metadata"
24+
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/restructure"
25+
_ "github.com/open-telemetry/opentelemetry-log-collection/operator/builtin/transformer/router"
2326
)

testbed/datasenders/k8s.go

Lines changed: 318 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,318 @@
1+
// Copyright The OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package datasenders
16+
17+
import (
18+
"context"
19+
"fmt"
20+
"io/ioutil"
21+
"os"
22+
"strconv"
23+
"strings"
24+
"time"
25+
26+
"go.opentelemetry.io/collector/consumer/pdata"
27+
"go.opentelemetry.io/collector/testbed/testbed"
28+
)
29+
30+
// FileLogK8sWriter represents abstract container k8s writer
31+
type FileLogK8sWriter struct {
32+
testbed.DataSenderBase
33+
file *os.File
34+
config string
35+
}
36+
37+
// Ensure FileLogK8sWriter implements LogDataSender.
38+
var _ testbed.LogDataSender = (*FileLogK8sWriter)(nil)
39+
40+
// NewFileLogK8sWriter creates a new data sender that will write kubernetes containerd
41+
// log entries to a file, to be tailed by FileLogReceiver and sent to the collector.
42+
//
43+
// config is an Otelcol config appended to the receivers section after executing fmt.Sprintf on it.
44+
// This implies few things:
45+
// - it should contain `%s` which will be replaced with the filename
46+
// - all `%` should be represented as `%%`
47+
// - indentation style matters. Spaces have to be used for indentation
48+
// and it should start with two spaces indentation
49+
//
50+
// Example config:
51+
// |`
52+
// | filelog:
53+
// | include: [ %s ]
54+
// | start_at: beginning
55+
// | operators:
56+
// | type: regex_parser
57+
// | regex: ^(?P<log>.*)$
58+
// | `
59+
func NewFileLogK8sWriter(config string) *FileLogK8sWriter {
60+
dir, err := ioutil.TempDir("", "namespace-*_test-pod_000011112222333344445555666677778888")
61+
if err != nil {
62+
panic("failed to create temp dir")
63+
}
64+
dir, err = ioutil.TempDir(dir, "*")
65+
if err != nil {
66+
panic("failed to create temp dir")
67+
}
68+
69+
file, err := ioutil.TempFile(dir, "*.log")
70+
if err != nil {
71+
panic("failed to create temp file")
72+
}
73+
74+
f := &FileLogK8sWriter{
75+
file: file,
76+
config: config,
77+
}
78+
79+
return f
80+
}
81+
82+
func (f *FileLogK8sWriter) Start() error {
83+
return nil
84+
}
85+
86+
func (f *FileLogK8sWriter) ConsumeLogs(_ context.Context, logs pdata.Logs) error {
87+
for i := 0; i < logs.ResourceLogs().Len(); i++ {
88+
for j := 0; j < logs.ResourceLogs().At(i).InstrumentationLibraryLogs().Len(); j++ {
89+
ills := logs.ResourceLogs().At(i).InstrumentationLibraryLogs().At(j)
90+
for k := 0; k < ills.Logs().Len(); k++ {
91+
_, err := f.file.Write(append(f.convertLogToTextLine(ills.Logs().At(k)), '\n'))
92+
if err != nil {
93+
return err
94+
}
95+
}
96+
}
97+
}
98+
return nil
99+
}
100+
101+
func (f *FileLogK8sWriter) convertLogToTextLine(lr pdata.LogRecord) []byte {
102+
sb := strings.Builder{}
103+
104+
// Timestamp
105+
sb.WriteString(time.Unix(0, int64(lr.Timestamp())).Format("2006-01-02T15:04:05.000000000Z"))
106+
107+
// Severity
108+
sb.WriteString(" stderr F ")
109+
sb.WriteString(lr.SeverityText())
110+
sb.WriteString(" ")
111+
112+
if lr.Body().Type() == pdata.AttributeValueSTRING {
113+
sb.WriteString(lr.Body().StringVal())
114+
}
115+
116+
lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
117+
sb.WriteString(" ")
118+
sb.WriteString(k)
119+
sb.WriteString("=")
120+
switch v.Type() {
121+
case pdata.AttributeValueSTRING:
122+
sb.WriteString(v.StringVal())
123+
case pdata.AttributeValueINT:
124+
sb.WriteString(strconv.FormatInt(v.IntVal(), 10))
125+
case pdata.AttributeValueDOUBLE:
126+
sb.WriteString(strconv.FormatFloat(v.DoubleVal(), 'f', -1, 64))
127+
case pdata.AttributeValueBOOL:
128+
sb.WriteString(strconv.FormatBool(v.BoolVal()))
129+
default:
130+
panic("missing case")
131+
}
132+
})
133+
134+
return []byte(sb.String())
135+
}
136+
137+
func (f *FileLogK8sWriter) Flush() {
138+
_ = f.file.Sync()
139+
}
140+
141+
func (f *FileLogK8sWriter) GenConfigYAMLStr() string {
142+
// Note that this generates a receiver config for agent.
143+
// We are testing filelog receiver here.
144+
145+
return fmt.Sprintf(f.config, f.file.Name())
146+
}
147+
148+
func (f *FileLogK8sWriter) ProtocolName() string {
149+
return "filelog"
150+
}
151+
152+
func (f *FileLogK8sWriter) GetEndpoint() string {
153+
return ""
154+
}
155+
156+
// NewKubernetesContainerWriter returns FileLogK8sWriter with configuration
157+
// to recognize and parse kubernetes container logs
158+
func NewKubernetesContainerWriter() *FileLogK8sWriter {
159+
return NewFileLogK8sWriter(`
160+
filelog:
161+
include: [ %s ]
162+
start_at: beginning
163+
include_file_path: true
164+
include_file_name: false
165+
operators:
166+
# Find out which format is used by kubernetes
167+
- type: router
168+
id: get-format
169+
routes:
170+
- output: parser-docker
171+
expr: '$$record matches "^\\{"'
172+
- output: parser-crio
173+
expr: '$$record matches "^[^ Z]+ "'
174+
- output: parser-containerd
175+
expr: '$$record matches "^[^ Z]+Z"'
176+
# Parse CRI-O format
177+
- type: regex_parser
178+
id: parser-crio
179+
regex: '^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
180+
output: extract_metadata_from_filepath
181+
timestamp:
182+
parse_from: time
183+
layout_type: gotime
184+
layout: '2006-01-02T15:04:05.000000000-07:00'
185+
# Parse CRI-Containerd format
186+
- type: regex_parser
187+
id: parser-containerd
188+
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
189+
output: extract_metadata_from_filepath
190+
timestamp:
191+
parse_from: time
192+
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
193+
# Parse Docker format
194+
- type: json_parser
195+
id: parser-docker
196+
output: extract_metadata_from_filepath
197+
timestamp:
198+
parse_from: time
199+
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
200+
# Extract metadata from file path
201+
- type: regex_parser
202+
id: extract_metadata_from_filepath
203+
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
204+
parse_from: $$attributes.file_path
205+
# Move out attributes to Attributes
206+
- type: metadata
207+
attributes:
208+
stream: 'EXPR($.stream)'
209+
k8s.container.name: 'EXPR($.container_name)'
210+
k8s.namespace.name: 'EXPR($.namespace)'
211+
k8s.pod.name: 'EXPR($.pod_name)'
212+
run_id: 'EXPR($.run_id)'
213+
k8s.pod.uid: 'EXPR($.uid)'
214+
# Clean up log record
215+
- type: restructure
216+
id: clean-up-log-record
217+
ops:
218+
- remove: logtag
219+
- remove: stream
220+
- remove: container_name
221+
- remove: namespace
222+
- remove: pod_name
223+
- remove: run_id
224+
- remove: uid
225+
`)
226+
}
227+
228+
// NewKubernetesCRIContainerdWriter returns FileLogK8sWriter with configuration
229+
// to parse only CRI-Containerd kubernetes logs
230+
func NewKubernetesCRIContainerdWriter() *FileLogK8sWriter {
231+
return NewFileLogK8sWriter(`
232+
filelog:
233+
include: [ %s ]
234+
start_at: beginning
235+
include_file_path: true
236+
include_file_name: false
237+
operators:
238+
# Parse CRI-Containerd format
239+
- type: regex_parser
240+
id: parser-containerd
241+
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
242+
output: extract_metadata_from_filepath
243+
timestamp:
244+
parse_from: time
245+
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
246+
# Extract metadata from file path
247+
- type: regex_parser
248+
id: extract_metadata_from_filepath
249+
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
250+
parse_from: $$attributes.file_path
251+
# Move out attributes to Attributes
252+
- type: metadata
253+
attributes:
254+
stream: 'EXPR($.stream)'
255+
k8s.container.name: 'EXPR($.container_name)'
256+
k8s.namespace.name: 'EXPR($.namespace)'
257+
k8s.pod.name: 'EXPR($.pod_name)'
258+
run_id: 'EXPR($.run_id)'
259+
k8s.pod.uid: 'EXPR($.uid)'
260+
# Clean up log record
261+
- type: restructure
262+
id: clean-up-log-record
263+
ops:
264+
- remove: logtag
265+
- remove: stream
266+
- remove: container_name
267+
- remove: namespace
268+
- remove: pod_name
269+
- remove: run_id
270+
- remove: uid
271+
`)
272+
}
273+
274+
// NewKubernetesCRIContainerdNoAttributesOpsWriter returns FileLogK8sWriter with configuration
275+
// to parse only CRI-Containerd kubernetes logs without reformatting attributes
276+
func NewKubernetesCRIContainerdNoAttributesOpsWriter() *FileLogK8sWriter {
277+
return NewFileLogK8sWriter(`
278+
filelog:
279+
include: [ %s ]
280+
start_at: beginning
281+
include_file_path: true
282+
include_file_name: false
283+
operators:
284+
# Parse CRI-Containerd format
285+
- type: regex_parser
286+
id: parser-containerd
287+
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
288+
output: extract_metadata_from_filepath
289+
timestamp:
290+
parse_from: time
291+
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
292+
# Extract metadata from file path
293+
- type: regex_parser
294+
id: extract_metadata_from_filepath
295+
regex: '^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]{36})\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$'
296+
parse_from: $$attributes.file_path
297+
`)
298+
}
299+
300+
// NewCRIContainerdWriter returns FileLogK8sWriter with configuration
301+
// to parse only CRI-Containerd logs (no extracting metadata from filename)
302+
func NewCRIContainerdWriter() *FileLogK8sWriter {
303+
return NewFileLogK8sWriter(`
304+
filelog:
305+
include: [ %s ]
306+
start_at: beginning
307+
include_file_path: true
308+
include_file_name: false
309+
operators:
310+
# Parse CRI-Containerd format
311+
- type: regex_parser
312+
id: parser-containerd
313+
regex: '^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*) (?P<log>.*)$'
314+
timestamp:
315+
parse_from: time
316+
layout: '%%Y-%%m-%%dT%%H:%%M:%%S.%%LZ'
317+
`)
318+
}

testbed/tests/log_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,42 @@ func TestLog10kDPS(t *testing.T) {
5252
ExpectedMaxRAM: 85,
5353
},
5454
},
55+
{
56+
name: "kubernetes containers",
57+
sender: datasenders.NewKubernetesContainerWriter(),
58+
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
59+
resourceSpec: testbed.ResourceSpec{
60+
ExpectedMaxCPU: 100,
61+
ExpectedMaxRAM: 150,
62+
},
63+
},
64+
{
65+
name: "k8s CRI-Containerd",
66+
sender: datasenders.NewKubernetesCRIContainerdWriter(),
67+
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
68+
resourceSpec: testbed.ResourceSpec{
69+
ExpectedMaxCPU: 100,
70+
ExpectedMaxRAM: 150,
71+
},
72+
},
73+
{
74+
name: "k8s CRI-Containerd no attr ops",
75+
sender: datasenders.NewKubernetesCRIContainerdNoAttributesOpsWriter(),
76+
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
77+
resourceSpec: testbed.ResourceSpec{
78+
ExpectedMaxCPU: 100,
79+
ExpectedMaxRAM: 150,
80+
},
81+
},
82+
{
83+
name: "CRI-Containerd",
84+
sender: datasenders.NewCRIContainerdWriter(),
85+
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
86+
resourceSpec: testbed.ResourceSpec{
87+
ExpectedMaxCPU: 100,
88+
ExpectedMaxRAM: 150,
89+
},
90+
},
5591
}
5692

5793
processors := map[string]string{

0 commit comments

Comments
 (0)