Skip to content

[pkg/ottl] support merging multiple maps via merge_maps() function #37343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/ottl-merge-maps.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: pkg/ottl

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Support merging multiple maps via merge_maps() function"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32954]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
74 changes: 73 additions & 1 deletion pkg/ottl/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
tCtx.GetLogRecord().Attributes().Remove("map_slice")
},
},
{
Expand All @@ -82,6 +83,11 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().PutInt("things.0.value", 2)
tCtx.GetLogRecord().Attributes().PutStr("things.1.name", "bar")
tCtx.GetLogRecord().Attributes().PutInt("things.1.value", 5)

tCtx.GetLogRecord().Attributes().Remove("map_slice")
tCtx.GetLogRecord().Attributes().PutStr("map_slice.0.foo1", "bar1")
tCtx.GetLogRecord().Attributes().PutStr("map_slice.0.foo2", "bar2")
tCtx.GetLogRecord().Attributes().PutStr("map_slice.1.total.string", "barbar1")
},
},
{
Expand All @@ -106,6 +112,10 @@ func Test_e2e_editors(t *testing.T) {
m.PutInt("test.things.0.value", 2)
m.PutStr("test.things.1.name", "bar")
m.PutInt("test.things.1.value", 5)

m.PutStr("test.map_slice.0.foo1", "bar1")
m.PutStr("test.map_slice.0.foo2", "bar2")
m.PutStr("test.map_slice.1.total.string", "barbar1")
m.CopyTo(tCtx.GetLogRecord().Attributes())
},
},
Expand Down Expand Up @@ -134,6 +144,10 @@ func Test_e2e_editors(t *testing.T) {
m.PutStr("test.things.1.name", "bar")
m.PutInt("test.things.1.value", 5)

m.PutStr("test.map_slice.0.foo1", "bar1")
m.PutStr("test.map_slice.0.foo2", "bar2")
m.PutStr("test.map_slice.1.total.string", "barbar1")

m.CopyTo(tCtx.GetLogRecord().Attributes())
},
},
Expand Down Expand Up @@ -165,6 +179,14 @@ func Test_e2e_editors(t *testing.T) {

m3 := m.PutEmptyMap("foo.nested")
m3.PutStr("test", "pass")

m4 := m.PutEmptyMap("map_slice.0")
m4.PutStr("foo1", "bar1")
m4.PutStr("foo2", "bar2")

m5 := m.PutEmptyMap("map_slice.1")
m5.PutStr("total.string", "barbar1")

m.CopyTo(tCtx.GetLogRecord().Attributes())
},
},
Expand All @@ -178,6 +200,7 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
tCtx.GetLogRecord().Attributes().Remove("map_slice")
},
},
{
Expand All @@ -195,6 +218,7 @@ func Test_e2e_editors(t *testing.T) {
tCtx.GetLogRecord().Attributes().Remove("things")
tCtx.GetLogRecord().Attributes().Remove("conflict.conflict1")
tCtx.GetLogRecord().Attributes().Remove("conflict")
tCtx.GetLogRecord().Attributes().Remove("map_slice")
},
},
{
Expand All @@ -215,7 +239,7 @@ func Test_e2e_editors(t *testing.T) {
},
},
{
statement: `merge_maps(attributes, attributes["foo"], "upsert")`,
statement: `merge_maps(attributes, attributes["foo"])`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("bar", "pass")
tCtx.GetLogRecord().Attributes().PutStr("flags", "pass")
Expand All @@ -235,6 +259,48 @@ func Test_e2e_editors(t *testing.T) {
l.AppendEmpty().SetStr("test")
},
},
{
statement: `merge_maps(attributes, attributes["foo"], "insert", attributes["map_slice"])`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("bar", "pass")
tCtx.GetLogRecord().Attributes().PutStr("foo1", "bar1")
tCtx.GetLogRecord().Attributes().PutStr("foo2", "bar2")
s := tCtx.GetLogRecord().Attributes().PutEmptySlice("slice")
v := s.AppendEmpty()
v.SetStr("val")
m2 := tCtx.GetLogRecord().Attributes().PutEmptyMap("nested")
m2.PutStr("test", "pass")
},
},
{
statement: `merge_maps(attributes, attributes["foo"], "update", attributes["map_slice"])`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("flags", "pass")
tCtx.GetLogRecord().Attributes().PutStr("total.string", "barbar1")
},
},
{
statement: `merge_maps(attributes, attributes["foo"], "upsert", attributes["map_slice"])`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("bar", "pass")
tCtx.GetLogRecord().Attributes().PutStr("foo1", "bar1")
tCtx.GetLogRecord().Attributes().PutStr("foo2", "bar2")
tCtx.GetLogRecord().Attributes().PutStr("flags", "pass")
tCtx.GetLogRecord().Attributes().PutStr("total.string", "barbar1")
s := tCtx.GetLogRecord().Attributes().PutEmptySlice("slice")
v := s.AppendEmpty()
v.SetStr("val")
m2 := tCtx.GetLogRecord().Attributes().PutEmptyMap("nested")
m2.PutStr("test", "pass")
},
},
{
statement: `merge_maps(attributes, strategy="insert", sources=attributes["map_slice"])`,
want: func(tCtx ottllog.TransformContext) {
tCtx.GetLogRecord().Attributes().PutStr("foo1", "bar1")
tCtx.GetLogRecord().Attributes().PutStr("foo2", "bar2")
},
},
{
statement: `replace_all_matches(attributes, "*/*", "test")`,
want: func(tCtx ottllog.TransformContext) {
Expand Down Expand Up @@ -1735,6 +1801,12 @@ func constructLogTransformContextEditors() ottllog.TransformContext {
mm1.PutStr("conflict2", "pass")
mmm := logRecord.Attributes().PutEmptyMap("conflict.conflict1")
mmm.PutStr("conflict2", "nopass")
slice := logRecord.Attributes().PutEmptySlice("map_slice")
ms := slice.AppendEmpty().SetEmptyMap()
ms.PutStr("foo1", "bar1")
ms.PutStr("foo2", "bar2")
ms2 := slice.AppendEmpty().SetEmptyMap()
ms2.PutStr("total.string", "barbar1")
m := logRecord.Attributes().PutEmptyMap("foo")
m.PutStr("bar", "pass")
m.PutStr("flags", "pass")
Expand Down
61 changes: 61 additions & 0 deletions pkg/ottl/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,67 @@ func (g StandardPMapGetter[K]) Get(ctx context.Context, tCtx K) (pcommon.Map, er
}
}

// PMapSliceGetter is a Getter that must return a []pcommon.Map.
type PMapSliceGetter[K any] interface {
// Get retrieves a []pcommon.Map value.
Get(ctx context.Context, tCtx K) ([]pcommon.Map, error)
}

// StandardPMapSliceGetter is a basic implementation of PMapSliceGetter
type StandardPMapSliceGetter[K any] struct {
Getter func(ctx context.Context, tCtx K) (any, error)
}

// Get retrieves a []pcommon.Map value.
// If the value is not a []pcommon.Map a new TypeError is returned.
// If there is an error getting the value it will be returned.
func (g StandardPMapSliceGetter[K]) Get(ctx context.Context, tCtx K) ([]pcommon.Map, error) {
val, err := g.Getter(ctx, tCtx)
if err != nil {
return []pcommon.Map{}, fmt.Errorf("error getting value in %T: %w", g, err)
}
if val == nil {
return []pcommon.Map{}, TypeError("expected []pcommon.Map but got nil")
}
switch v := val.(type) {
case []pcommon.Map:
return v, nil
case []map[string]any:
result := []pcommon.Map{}
for _, mm := range v {
m := pcommon.NewMap()
err = m.FromRaw(mm)
if err != nil {
return []pcommon.Map{}, err
}
result = append(result, m)
}
return result, nil
case pcommon.Slice:
result := []pcommon.Map{}
for _, mm := range v.AsRaw() {
mmm, ok := mm.(pcommon.Map)
if ok {
result = append(result, mmm)
} else {
mmm, ok := mm.(map[string]any)
if !ok {
return []pcommon.Map{}, TypeError(fmt.Sprintf("expected pcommon.Map/map[string]any but got %T", v))
}
m := pcommon.NewMap()
err = m.FromRaw(mmm)
if err != nil {
return []pcommon.Map{}, err
}
result = append(result, m)
}
}
return result, nil
default:
return []pcommon.Map{}, TypeError(fmt.Sprintf("expected []pcommon.Map but got %T", val))
}
}

// StringLikeGetter is a Getter that returns a string by converting the underlying value to a string if necessary.
type StringLikeGetter[K any] interface {
// Get retrieves a string value.
Expand Down
89 changes: 89 additions & 0 deletions pkg/ottl/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,95 @@ func Test_StandardPMapGetter_WrappedError(t *testing.T) {
assert.False(t, ok)
}

func Test_StandardPMapSliceGetter(t *testing.T) {
var data []map[string]any
data = append(data, map[string]any{})
tests := []struct {
name string
getter StandardPMapSliceGetter[any]
want any
valid bool
expectedErrorMsg string
}{
{
name: "[]pcommon.map type",
getter: StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return []pcommon.Map{pcommon.NewMap()}, nil
},
},
want: []pcommon.Map{pcommon.NewMap()},
valid: true,
},
{
name: "[]map[string]any type",
getter: StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return data, nil
},
},
want: []pcommon.Map{pcommon.NewMap()},
valid: true,
},
{
name: "pcommon.Slice type",
getter: StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return pcommon.NewSlice(), nil
},
},
want: []pcommon.Map{},
valid: true,
},
{
name: "Incorrect type",
getter: StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return true, nil
},
},
valid: false,
expectedErrorMsg: "expected []pcommon.Map but got bool",
},
{
name: "nil",
getter: StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return nil, nil
},
},
valid: false,
expectedErrorMsg: "expected []pcommon.Map but got nil",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
val, err := tt.getter.Get(context.Background(), nil)
if tt.valid {
assert.NoError(t, err)
assert.Equal(t, tt.want, val)
} else {
assert.IsType(t, TypeError(""), err)
assert.EqualError(t, err, tt.expectedErrorMsg)
}
})
}
}

//nolint:errorlint
func Test_StandardPMapSliceGetter_WrappedError(t *testing.T) {
getter := StandardPMapSliceGetter[any]{
Getter: func(_ context.Context, _ any) (any, error) {
return nil, TypeError("")
},
}
_, err := getter.Get(context.Background(), nil)
assert.Error(t, err)
_, ok := err.(TypeError)
assert.False(t, ok)
}

func Test_StandardDurationGetter(t *testing.T) {
oneHourOneMinuteOneSecond, err := time.ParseDuration("1h1m1s")
require.NoError(t, err)
Expand Down
12 changes: 12 additions & 0 deletions pkg/ottl/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,12 @@ func (p *Parser[K]) buildSliceArg(argVal value, argType reflect.Type) (any, erro
return nil, err
}
return arg, nil
case strings.HasPrefix(name, "PMapSliceGetter"):
arg, err := buildSlice[PMapSliceGetter[K]](argVal, argType, p.buildArg, name)
if err != nil {
return nil, err
}
return arg, nil
case strings.HasPrefix(name, "StringGetter"):
arg, err := buildSlice[StringGetter[K]](argVal, argType, p.buildArg, name)
if err != nil {
Expand Down Expand Up @@ -610,6 +616,12 @@ func (p *Parser[K]) buildArg(argVal value, argType reflect.Type) (any, error) {
return nil, err
}
return StandardPMapGetter[K]{Getter: arg.Get}, nil
case strings.HasPrefix(name, "PMapSliceGetter"):
arg, err := p.newGetter(argVal)
if err != nil {
return nil, err
}
return StandardPMapSliceGetter[K]{Getter: arg.Get}, nil
case strings.HasPrefix(name, "DurationGetter"):
arg, err := p.newGetter(argVal)
if err != nil {
Expand Down
Loading