Skip to content

[receiver/awsfirehose] Unmarshal error when request is larger than the buffer limit of bufio.Scanner #38736

Closed
@yiquanzhou

Description

@yiquanzhou

Component(s)

No response

What happened?

Description

We are seeing error Error scanning for newline-delimited JSON with root cause bufio.Scanner: token too long in awsfirehosereceiver.

The unmarshaller uses

scanner := bufio.NewScanner(r)

to read and deserialize json payload of Firehose stream HTTP request. The scanner has an internal buffer which has the max size of 64KB.

However, Firehose buffers incoming data (CloudWatch logs & metrics) before sending them. Default buffer size is 5MB but configurable up to 64MB.

Image

Each log event can take up to 256KB. doc

So we can hit the 64KB buffer quickly even with the default Firehose buffer size of 5MB.

Steps to Reproduce

Can be reproduced with a unit test which creates a message that is larger than 64KB

func TestUnmarshalLargePayload(t *testing.T) {
	unmarshaler := NewUnmarshaler(zap.NewNop(), component.NewDefaultBuildInfo())

	var largePayload bytes.Buffer
	largePayload.WriteString(`{"messageType":"DATA_MESSAGE","owner":"123","logGroup":"test","logStream":"test","logEvents":[`)
	largePayload.WriteString(`{"timestamp":1742239784,"message":"`)
	for largePayload.Len() < 64*1024-4 {
		largePayload.WriteString("a")
	}
	largePayload.WriteString(`"}]}`)

	compressedRecord, err := gzipData(largePayload.Bytes())
	require.NoError(t, err)

	_, err = unmarshaler.UnmarshalLogs(compressedRecord)
	require.Error(t, err)
	require.Equal(t, errInvalidRecords, err)
}

Expected Result

awsfirehose receiver should be able to handle large incoming requests without error.

How to fix?

Option1: raise buffer limit of bufio.Scanner

We can call https://pkg.go.dev/bufio#Scanner.Buffer method to pass a byte buffer with higher capacity to the scanner.

scanner := bufio.NewScanner(r)
buf := make([]byte, 0, initialBufferCapacity)
scanner.Buffer(buf, 64*1024*1024) // set the buffer limit to 64MB

We would need to determine the initial buffer capacity which ideally work for most of the requests. But this really depends on the data that is streamed by firehose. We might need to make this configurable.

Option2: use bufio.Reader

bufio.Reader seems to provide better error handling and can read large tokens. https://chriswilcox.dev/blog/2024/04/09/Scan-vs-Read-in-bufio.html

Collector version

v0.121.0

Environment information

Environment

OS: (e.g., "Macos 15.3.1")

OpenTelemetry Collector configuration

Log output

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions