Skip to content

Commit 93bddee

Browse files
VihasMakwanaatoulme
authored andcommitted
[testbed] - Add scenarios to handle large files (open-telemetry#34417)
**Description:** Add test cases covering large files to existing testbed. This PR adds following scenarios: Scenario 1: Ensure that all logs are captured for files that reach a size of 2GB. Scenario 2: Ensure that all logs are captured for files that reach a size of 6GB Scenario 3: Ensure that all logs are captured for a file of approximately 1.5GB that contains prewritten data. **Link to tracking Issue:** open-telemetry#34288 **Testing:** Added --------- Co-authored-by: Antoine Toulme <[email protected]>
1 parent c7f879a commit 93bddee

File tree

4 files changed

+262
-0
lines changed

4 files changed

+262
-0
lines changed

.chloggen/add-large-file-tests.yaml

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: testbed
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add test case scenarios to handle large files to existing testbed.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [34288]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

testbed/testbed/validator.go

+72
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ import (
88
"log"
99
"reflect"
1010
"sort"
11+
"strconv"
1112
"strings"
1213
"time"
1314

1415
"github.com/stretchr/testify/assert"
16+
"github.com/stretchr/testify/require"
1517
"go.opentelemetry.io/collector/pdata/pcommon"
1618
"go.opentelemetry.io/collector/pdata/ptrace"
1719
)
@@ -561,3 +563,73 @@ func populateSpansMap(spansMap map[string]ptrace.Span, tds []ptrace.Traces) {
561563
func traceIDAndSpanIDToString(traceID pcommon.TraceID, spanID pcommon.SpanID) string {
562564
return fmt.Sprintf("%s-%s", traceID, spanID)
563565
}
566+
567+
type CorrectnessLogTestValidator struct {
568+
dataProvider DataProvider
569+
}
570+
571+
func NewCorrectnessLogTestValidator(provider DataProvider) *CorrectnessLogTestValidator {
572+
return &CorrectnessLogTestValidator{
573+
dataProvider: provider,
574+
}
575+
}
576+
577+
func (c *CorrectnessLogTestValidator) Validate(tc *TestCase) {
578+
if dataProvider, ok := c.dataProvider.(*perfTestDataProvider); ok {
579+
logsReceived := tc.MockBackend.ReceivedLogs
580+
581+
idsSent := make([][2]string, 0)
582+
idsReceived := make([][2]string, 0)
583+
584+
for batch := 0; batch < int(dataProvider.traceIDSequence.Load()); batch++ {
585+
for idx := 0; idx < dataProvider.options.ItemsPerBatch; idx++ {
586+
idsSent = append(idsSent, [2]string{"batch_" + strconv.Itoa(batch), "item_" + strconv.Itoa(idx)})
587+
}
588+
}
589+
for _, log := range logsReceived {
590+
for i := 0; i < log.ResourceLogs().Len(); i++ {
591+
for j := 0; j < log.ResourceLogs().At(i).ScopeLogs().Len(); j++ {
592+
s := log.ResourceLogs().At(i).ScopeLogs().At(j)
593+
for k := 0; k < s.LogRecords().Len(); k++ {
594+
logRecord := s.LogRecords().At(k)
595+
batchIndex, ok := logRecord.Attributes().Get("batch_index")
596+
require.True(tc.t, ok, "batch_index missing from attributes; use perfDataProvider")
597+
itemIndex, ok := logRecord.Attributes().Get("item_index")
598+
require.True(tc.t, ok, "item_index missing from attributes; use perfDataProvider")
599+
600+
idsReceived = append(idsReceived, [2]string{batchIndex.Str(), itemIndex.Str()})
601+
}
602+
}
603+
}
604+
}
605+
606+
assert.ElementsMatch(tc.t, idsSent, idsReceived)
607+
}
608+
}
609+
610+
func (c *CorrectnessLogTestValidator) RecordResults(tc *TestCase) {
611+
rc := tc.agentProc.GetTotalConsumption()
612+
613+
var result string
614+
if tc.t.Failed() {
615+
result = "FAIL"
616+
} else {
617+
result = "PASS"
618+
}
619+
620+
// Remove "Test" prefix from test name.
621+
testName := tc.t.Name()[4:]
622+
623+
tc.resultsSummary.Add(tc.t.Name(), &PerformanceTestResult{
624+
testName: testName,
625+
result: result,
626+
receivedSpanCount: tc.MockBackend.DataItemsReceived(),
627+
sentSpanCount: tc.LoadGenerator.DataItemsSent(),
628+
duration: time.Since(tc.startTime),
629+
cpuPercentageAvg: rc.CPUPercentAvg,
630+
cpuPercentageMax: rc.CPUPercentMax,
631+
ramMibAvg: rc.RAMMiBAvg,
632+
ramMibMax: rc.RAMMiBMax,
633+
errorCause: tc.errorCause,
634+
})
635+
}

testbed/tests/log_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@
77
package tests
88

99
import (
10+
"context"
11+
"path"
12+
"path/filepath"
13+
"sync/atomic"
1014
"testing"
15+
"time"
16+
17+
"github.com/stretchr/testify/require"
1118

1219
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil"
1320
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datareceivers"
@@ -233,3 +240,113 @@ func TestLogOtlpSendingQueue(t *testing.T) {
233240
})
234241

235242
}
243+
244+
func TestLogLargeFiles(t *testing.T) {
245+
tests := []struct {
246+
name string
247+
sender testbed.DataSender
248+
receiver testbed.DataReceiver
249+
loadOptions testbed.LoadOptions
250+
sleepSeconds int
251+
}{
252+
{
253+
/*
254+
* The FileLogWriter generates strings almost 100 bytes each.
255+
* With a rate of 200,000 lines per second over a duration of 100 seconds,
256+
* this results in a file size of approximately 2GB over its lifetime.
257+
*/
258+
name: "filelog-largefiles-2Gb-lifetime",
259+
sender: datasenders.NewFileLogWriter(),
260+
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
261+
loadOptions: testbed.LoadOptions{
262+
DataItemsPerSecond: 200000,
263+
ItemsPerBatch: 1,
264+
Parallel: 100,
265+
},
266+
sleepSeconds: 100,
267+
},
268+
{
269+
/*
270+
* The FileLogWriter generates strings almost 100 bytes each.
271+
* With a rate of 330,000 lines per second over a duration of 200 seconds,
272+
* this results in a file size of approximately 6GB over its lifetime.
273+
*/
274+
name: "filelog-largefiles-6GB-lifetime",
275+
sender: datasenders.NewFileLogWriter(),
276+
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
277+
loadOptions: testbed.LoadOptions{
278+
DataItemsPerSecond: 330000,
279+
ItemsPerBatch: 10,
280+
Parallel: 10,
281+
},
282+
sleepSeconds: 200,
283+
},
284+
}
285+
processors := map[string]string{
286+
"batch": `
287+
batch:
288+
`}
289+
for _, test := range tests {
290+
t.Run(test.name, func(t *testing.T) {
291+
ScenarioLong(
292+
t,
293+
test.sender,
294+
test.receiver,
295+
test.loadOptions,
296+
performanceResultsSummary,
297+
test.sleepSeconds,
298+
processors,
299+
)
300+
})
301+
}
302+
}
303+
304+
func TestLargeFileOnce(t *testing.T) {
305+
processors := map[string]string{
306+
"batch": `
307+
batch:
308+
`,
309+
}
310+
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
311+
require.NoError(t, err)
312+
sender := datasenders.NewFileLogWriter()
313+
receiver := testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t))
314+
loadOptions := testbed.LoadOptions{
315+
DataItemsPerSecond: 1,
316+
ItemsPerBatch: 10000000,
317+
Parallel: 1,
318+
}
319+
320+
// Write data at once, before starting up the collector
321+
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
322+
dataItemsGenerated := atomic.Uint64{}
323+
dataProvider.SetLoadGeneratorCounters(&dataItemsGenerated)
324+
ld, _ := dataProvider.GenerateLogs()
325+
326+
require.NoError(t, sender.ConsumeLogs(context.Background(), ld))
327+
agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))
328+
329+
configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
330+
configCleanup, err := agentProc.PrepareConfig(configStr)
331+
require.NoError(t, err)
332+
defer configCleanup()
333+
334+
tc := testbed.NewTestCase(
335+
t,
336+
dataProvider,
337+
sender,
338+
receiver,
339+
agentProc,
340+
&testbed.CorrectnessLogTestValidator{},
341+
performanceResultsSummary,
342+
)
343+
t.Cleanup(tc.Stop)
344+
345+
tc.StartBackend()
346+
tc.StartAgent()
347+
348+
tc.WaitForN(func() bool { return dataItemsGenerated.Load() == tc.MockBackend.DataItemsReceived() }, 20*time.Second, "all logs received")
349+
350+
tc.StopAgent()
351+
tc.ValidateData()
352+
}

testbed/tests/scenarios.go

+46
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,52 @@ func ScenarioSendingQueuesNotFull(
501501
tc.ValidateData()
502502
}
503503

504+
func ScenarioLong(
505+
t *testing.T,
506+
sender testbed.DataSender,
507+
receiver testbed.DataReceiver,
508+
loadOptions testbed.LoadOptions,
509+
resultsSummary testbed.TestResultsSummary,
510+
sleepTime int,
511+
processors map[string]string,
512+
) {
513+
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
514+
require.NoError(t, err)
515+
516+
agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))
517+
518+
configStr := createConfigYaml(t, sender, receiver, resultDir, processors, nil)
519+
configCleanup, err := agentProc.PrepareConfig(configStr)
520+
require.NoError(t, err)
521+
defer configCleanup()
522+
dataProvider := testbed.NewPerfTestDataProvider(loadOptions)
523+
tc := testbed.NewTestCase(
524+
t,
525+
dataProvider,
526+
sender,
527+
receiver,
528+
agentProc,
529+
&testbed.CorrectnessLogTestValidator{},
530+
resultsSummary,
531+
)
532+
t.Cleanup(tc.Stop)
533+
534+
tc.StartBackend()
535+
tc.StartAgent()
536+
537+
tc.StartLoad(loadOptions)
538+
539+
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() > 0 }, "load generator started")
540+
541+
tc.Sleep(time.Second * time.Duration(sleepTime))
542+
543+
tc.StopLoad()
544+
545+
tc.WaitFor(func() bool { return tc.LoadGenerator.DataItemsSent() == tc.MockBackend.DataItemsReceived() }, "all logs received")
546+
547+
tc.ValidateData()
548+
}
549+
504550
func constructLoadOptions(test TestCase) testbed.LoadOptions {
505551
options := testbed.LoadOptions{DataItemsPerSecond: 1000, ItemsPerBatch: 10}
506552
options.Attributes = make(map[string]string)

0 commit comments

Comments
 (0)