Description
Component(s)
extension/encoding
Is your feature request related to a problem? Please describe.
The encoding extension currently provides interfaces for extensions that embed the pdata unmarshaler interfaces, which look like as follows for logs (the same sort of interface exists for traces, metrics, profiles):
// Unmarshaler unmarshalls bytes into pdata.Logs.
type Unmarshaler interface {
// UnmarshalLogs the given bytes into pdata.Logs.
// If the error is not nil, the returned pdata.Logs cannot be used.
UnmarshalLogs(buf []byte) (Logs, error)
}
This interface presents two challenges:
- We cannot stream into the unmarshaler.
- We cannot stream the results of the unmarshaling.
Together this means means we must have both the full unmarshaled input AND the full set of unmarshaled logs in memory before we can proceed to process any data.
Describe the solution you'd like
- I would like to be able to pass in an
io.Reader
so the unmarshaler to avoid reading the full input into memory. - I would like an optional interface for unmarshalers to incrementally decode logs.
Concretely, I'm thinking that we should introduce interfaces like follows, similar to the encoding/json.Decoder
API:
// LogsDecoderExtension is an extension that decodes logs from a stream.
type StreamingLogsUnmarshalerExtension interface {
extension.Extension
NewLogsDecoder(io.Reader) (LogsDecoder, error)
}
type LogsDecoder interface {
// DecodeLogs decodes logs and appends them to the given plog.Logs.
//
// DecodeLogs returns io.EOF if there are no more logs to be decoded.
DecodeLogs(to plog.Logs) error
}
We could also provide types to adapt the Unmarshaler interfaces to Decoder interfaces, which would look something like:
type LogsDecoderAdapter struct {
plog.Unmarshaler
mu sync.Mutex
r io.Reader
}
func (a *LogsDecoderAdapter) DecodeLogs(to plog.Logs) error {
a.mu.Lock()
defer a.mu.Unlock()
if a.r == nil {
return io.EOF
}
data, err := io.ReadAll(a.r)
if err != nil {
return err
}
a.r = nil
logs, err := a.UnmarshalLogs(data)
if err != nil {
return err
}
// TODO append logs to 'to'
return nil
}
Describe alternatives you've considered
Initially I thought of using an iter.Seq
coroutine approach, but that would make it awkward to deal with errors mid stream.
Additional context
No response