Skip to content

Add support for stream decoding #38780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
axw opened this issue Mar 19, 2025 · 3 comments
Open

Add support for stream decoding #38780

axw opened this issue Mar 19, 2025 · 3 comments
Labels
enhancement New feature or request extension/encoding

Comments

@axw
Copy link
Contributor

axw commented Mar 19, 2025

Component(s)

extension/encoding

Is your feature request related to a problem? Please describe.

The encoding extension currently provides interfaces for extensions that embed the pdata unmarshaler interfaces, which look like as follows for logs (the same sort of interface exists for traces, metrics, profiles):

// Unmarshaler unmarshalls bytes into pdata.Logs.
type Unmarshaler interface {
	// UnmarshalLogs the given bytes into pdata.Logs.
	// If the error is not nil, the returned pdata.Logs cannot be used.
	UnmarshalLogs(buf []byte) (Logs, error)
}

This interface presents two challenges:

  • We cannot stream into the unmarshaler.
  • We cannot stream the results of the unmarshaling.

Together this means means we must have both the full unmarshaled input AND the full set of unmarshaled logs in memory before we can proceed to process any data.

Describe the solution you'd like

  • I would like to be able to pass in an io.Reader so the unmarshaler to avoid reading the full input into memory.
  • I would like an optional interface for unmarshalers to incrementally decode logs.

Concretely, I'm thinking that we should introduce interfaces like follows, similar to the encoding/json.Decoder API:

// LogsDecoderExtension is an extension that decodes logs from a stream.
type StreamingLogsUnmarshalerExtension interface {
	extension.Extension
	NewLogsDecoder(io.Reader) (LogsDecoder, error)
}

type LogsDecoder interface {
	// DecodeLogs decodes logs and appends them to the given plog.Logs.
	//
	// DecodeLogs returns io.EOF if there are no more logs to be decoded.
	DecodeLogs(to plog.Logs) error
}

We could also provide types to adapt the Unmarshaler interfaces to Decoder interfaces, which would look something like:

type LogsDecoderAdapter struct {
	plog.Unmarshaler

	mu sync.Mutex
	r io.Reader
}

func (a *LogsDecoderAdapter) DecodeLogs(to plog.Logs) error {
	a.mu.Lock()
	defer a.mu.Unlock()
	if a.r == nil {
		return io.EOF
	}
	data, err := io.ReadAll(a.r)
	if err != nil {
		return err
	}
	a.r = nil
	logs, err := a.UnmarshalLogs(data)
	if err != nil {
		return err
	}
	// TODO append logs to 'to'
	return nil
}

Describe alternatives you've considered

Initially I thought of using an iter.Seq coroutine approach, but that would make it awkward to deal with errors mid stream.

Additional context

No response

@axw axw added enhancement New feature or request needs triage New item requiring triage labels Mar 19, 2025
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

Copy link
Contributor

Pinging code owners for extension/encoding
extension/encoding/avrologencodingextension
extension/encoding/awscloudwatchmetricstreamsencodingextension
extension/encoding/googlecloudlogentryencodingextension
extension/encoding/jaegerencodingextension
extension/encoding/jsonlogencodingextension
extension/encoding/otlpencodingextension
extension/encoding/skywalkingencodingextension
extension/encoding/textencodingextension
extension/encoding/zipkinencodingextension: @atoulme @dao-jun @dmitryax @MovieStoreGuy @VihasMakwana. See Adding Labels via Comments if you do not have permissions to add labels yourself. For example, comment '/label priority:p2 -needs-triaged' to set the priority and remove the needs-triaged label.

atoulme pushed a commit that referenced this issue Mar 20, 2025
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
The current `awk` expression will return all matches, so if a component
label is a substring of another component label, the resulting label
will be all matching labels appended to each other.

Example:
#38780
The label wasn't originally added because the [found label was too long
to
add](https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/13938538170/job/39010977164).
However, code owners were properly pinged. When I added the proper
label, the [incorrect label name was
detected](#38780 (comment)).

When searching the `.github/component_labels.txt` file, the label we
want to add is the first match that we find. This is the **minimally
matching label**, since the file is in alphabetical order. There may be
a better `awk` way to implement this logic, but `head -n 1` returns the
first matching line, if any are found in the given text.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Related
#38622

<!--Describe what testing was performed and which tests were added.-->
#### Testing
**Before:**
```
crobert$ ~/dev/contrib/first $ COMPONENT="receiver/hostmetrics"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt
receiver/hostmetricsreceiver
receiver/hostmetricsreceiver/internal/scraper/cpuscraper
receiver/hostmetricsreceiver/internal/scraper/diskscraper
receiver/hostmetricsreceiver/internal/scraper/filesystemscraper
receiver/hostmetricsreceiver/internal/scraper/loadscraper
receiver/hostmetricsreceiver/internal/scraper/memoryscraper
receiver/hostmetricsreceiver/internal/scraper/networkscraper
receiver/hostmetricsreceiver/internal/scraper/pagingscraper
receiver/hostmetricsreceiver/internal/scraper/processesscraper
receiver/hostmetricsreceiver/internal/scraper/processscraper
receiver/hostmetricsreceiver/internal/scraper/systemscraper
crobert$ ~/dev/contrib/first $ COMPONENT="extension/encoding"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt
extension/encoding
extension/encoding/avrologencodingextension
extension/encoding/awscloudwatchmetricstreamsencodingextension
extension/encoding/awslogsencodingextension
extension/encoding/googlecloudlogentryencodingextension
extension/encoding/jaegerencodingextension
extension/encoding/jsonlogencodingextension
extension/encoding/otlpencodingextension
extension/encoding/skywalkingencodingextension
extension/encoding/textencodingextension
extension/encoding/zipkinencodingextension
```
**After:**
```
crobert$ ~/dev/contrib/first $ COMPONENT="receiver/hostmetrics"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt | head -n 1
receiver/hostmetricsreceiver
crobert$ ~/dev/contrib/first $ COMPONENT="extension/encoding"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt | head -n 1
extension/encoding
@MovieStoreGuy MovieStoreGuy removed the needs triage New item requiring triage label Apr 7, 2025
@MovieStoreGuy
Copy link
Contributor

I am for the first approach since it can hide a lot of the boring details, I worry the adapter in practice might actual cause a lot of contention if the mutex is kept there.

Fiery-Fenix pushed a commit to Fiery-Fenix/opentelemetry-collector-contrib that referenced this issue Apr 24, 2025
…metry#38844)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
The current `awk` expression will return all matches, so if a component
label is a substring of another component label, the resulting label
will be all matching labels appended to each other.

Example:
open-telemetry#38780
The label wasn't originally added because the [found label was too long
to
add](https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/13938538170/job/39010977164).
However, code owners were properly pinged. When I added the proper
label, the [incorrect label name was
detected](open-telemetry#38780 (comment)).

When searching the `.github/component_labels.txt` file, the label we
want to add is the first match that we find. This is the **minimally
matching label**, since the file is in alphabetical order. There may be
a better `awk` way to implement this logic, but `head -n 1` returns the
first matching line, if any are found in the given text.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Related
open-telemetry#38622

<!--Describe what testing was performed and which tests were added.-->
#### Testing
**Before:**
```
crobert$ ~/dev/contrib/first $ COMPONENT="receiver/hostmetrics"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt
receiver/hostmetricsreceiver
receiver/hostmetricsreceiver/internal/scraper/cpuscraper
receiver/hostmetricsreceiver/internal/scraper/diskscraper
receiver/hostmetricsreceiver/internal/scraper/filesystemscraper
receiver/hostmetricsreceiver/internal/scraper/loadscraper
receiver/hostmetricsreceiver/internal/scraper/memoryscraper
receiver/hostmetricsreceiver/internal/scraper/networkscraper
receiver/hostmetricsreceiver/internal/scraper/pagingscraper
receiver/hostmetricsreceiver/internal/scraper/processesscraper
receiver/hostmetricsreceiver/internal/scraper/processscraper
receiver/hostmetricsreceiver/internal/scraper/systemscraper
crobert$ ~/dev/contrib/first $ COMPONENT="extension/encoding"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt
extension/encoding
extension/encoding/avrologencodingextension
extension/encoding/awscloudwatchmetricstreamsencodingextension
extension/encoding/awslogsencodingextension
extension/encoding/googlecloudlogentryencodingextension
extension/encoding/jaegerencodingextension
extension/encoding/jsonlogencodingextension
extension/encoding/otlpencodingextension
extension/encoding/skywalkingencodingextension
extension/encoding/textencodingextension
extension/encoding/zipkinencodingextension
```
**After:**
```
crobert$ ~/dev/contrib/first $ COMPONENT="receiver/hostmetrics"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt | head -n 1
receiver/hostmetricsreceiver
crobert$ ~/dev/contrib/first $ COMPONENT="extension/encoding"
crobert$ ~/dev/contrib/first $ awk -v path="${COMPONENT}" 'index($1, path) > 0 || index($2, path) > 0 {print $1}' .github/component_labels.txt | head -n 1
extension/encoding
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request extension/encoding
Projects
None yet
Development

No branches or pull requests

3 participants