Skip to content

Add support for stream decoding #38780

Open
@axw

Description

@axw

Component(s)

extension/encoding

Is your feature request related to a problem? Please describe.

The encoding extension currently provides interfaces for extensions that embed the pdata unmarshaler interfaces, which look like as follows for logs (the same sort of interface exists for traces, metrics, profiles):

// Unmarshaler unmarshalls bytes into pdata.Logs.
type Unmarshaler interface {
	// UnmarshalLogs the given bytes into pdata.Logs.
	// If the error is not nil, the returned pdata.Logs cannot be used.
	UnmarshalLogs(buf []byte) (Logs, error)
}

This interface presents two challenges:

  • We cannot stream into the unmarshaler.
  • We cannot stream the results of the unmarshaling.

Together this means means we must have both the full unmarshaled input AND the full set of unmarshaled logs in memory before we can proceed to process any data.

Describe the solution you'd like

  • I would like to be able to pass in an io.Reader so the unmarshaler to avoid reading the full input into memory.
  • I would like an optional interface for unmarshalers to incrementally decode logs.

Concretely, I'm thinking that we should introduce interfaces like follows, similar to the encoding/json.Decoder API:

// LogsDecoderExtension is an extension that decodes logs from a stream.
type StreamingLogsUnmarshalerExtension interface {
	extension.Extension
	NewLogsDecoder(io.Reader) (LogsDecoder, error)
}

type LogsDecoder interface {
	// DecodeLogs decodes logs and appends them to the given plog.Logs.
	//
	// DecodeLogs returns io.EOF if there are no more logs to be decoded.
	DecodeLogs(to plog.Logs) error
}

We could also provide types to adapt the Unmarshaler interfaces to Decoder interfaces, which would look something like:

type LogsDecoderAdapter struct {
	plog.Unmarshaler

	mu sync.Mutex
	r io.Reader
}

func (a *LogsDecoderAdapter) DecodeLogs(to plog.Logs) error {
	a.mu.Lock()
	defer a.mu.Unlock()
	if a.r == nil {
		return io.EOF
	}
	data, err := io.ReadAll(a.r)
	if err != nil {
		return err
	}
	a.r = nil
	logs, err := a.UnmarshalLogs(data)
	if err != nil {
		return err
	}
	// TODO append logs to 'to'
	return nil
}

Describe alternatives you've considered

Initially I thought of using an iter.Seq coroutine approach, but that would make it awkward to deal with errors mid stream.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions