Skip to content

Commit 1058473

Browse files
committed
[pkg/ottl] support merging multiple maps via merge_maps() function
Signed-off-by: odubajDT <[email protected]>
1 parent d09a0cc commit 1058473

File tree

4 files changed

+99
-40
lines changed

4 files changed

+99
-40
lines changed

pkg/ottl/expression.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,50 @@ func (g StandardPMapGetter[K]) Get(ctx context.Context, tCtx K) (pcommon.Map, er
432432
}
433433
}
434434

435+
// PMapGetter is a Getter that must return a []pcommon.Map.
436+
type PMapSliceLikeGetter[K any] interface {
437+
// Get retrieves a []pcommon.Map value.
438+
Get(ctx context.Context, tCtx K) ([]pcommon.Map, error)
439+
}
440+
441+
// StandardPMapGetter is a basic implementation of PMapGetter
442+
type StandardPMapSliceLikeGetter[K any] struct {
443+
Getter func(ctx context.Context, tCtx K) (any, error)
444+
}
445+
446+
// Get retrieves a pcommon.Map value.
447+
// If the value is not a pcommon.Map a new TypeError is returned.
448+
// If there is an error getting the value it will be returned.
449+
func (g StandardPMapSliceLikeGetter[K]) Get(ctx context.Context, tCtx K) ([]pcommon.Map, error) {
450+
val, err := g.Getter(ctx, tCtx)
451+
if err != nil {
452+
return []pcommon.Map{}, fmt.Errorf("error getting value in %T: %w", g, err)
453+
}
454+
if val == nil {
455+
return []pcommon.Map{}, TypeError("expected []pcommon.Map but got nil")
456+
}
457+
switch v := val.(type) {
458+
case pcommon.Map:
459+
return []pcommon.Map{v}, nil
460+
case []pcommon.Map:
461+
return v, nil
462+
case pcommon.Value:
463+
if v.Type() == pcommon.ValueTypeMap {
464+
return []pcommon.Map{v.Map()}, nil
465+
}
466+
return []pcommon.Map{}, TypeError(fmt.Sprintf("expected []pcommon.Map but got %v", v.Type()))
467+
case map[string]any:
468+
m := pcommon.NewMap()
469+
err = m.FromRaw(v)
470+
if err != nil {
471+
return []pcommon.Map{}, err
472+
}
473+
return []pcommon.Map{m}, nil
474+
default:
475+
return []pcommon.Map{}, TypeError(fmt.Sprintf("expected []pcommon.Map but got %T", val))
476+
}
477+
}
478+
435479
// StringLikeGetter is a Getter that returns a string by converting the underlying value to a string if necessary.
436480
type StringLikeGetter[K any] interface {
437481
// Get retrieves a string value.

pkg/ottl/functions.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,12 @@ func (p *Parser[K]) buildSliceArg(argVal value, argType reflect.Type) (any, erro
465465
return nil, err
466466
}
467467
return arg, nil
468+
case strings.HasPrefix(name, "PMapSliceLikeGetter"):
469+
arg, err := buildSlice[PMapSliceLikeGetter[K]](argVal, argType, p.buildArg, name)
470+
if err != nil {
471+
return nil, err
472+
}
473+
return arg, nil
468474
case strings.HasPrefix(name, "StringGetter"):
469475
arg, err := buildSlice[StringGetter[K]](argVal, argType, p.buildArg, name)
470476
if err != nil {
@@ -589,6 +595,12 @@ func (p *Parser[K]) buildArg(argVal value, argType reflect.Type) (any, error) {
589595
return nil, err
590596
}
591597
return StandardPMapGetter[K]{Getter: arg.Get}, nil
598+
case strings.HasPrefix(name, "PMapSliceLikeGetter"):
599+
arg, err := p.newGetter(argVal)
600+
if err != nil {
601+
return nil, err
602+
}
603+
return StandardPMapSliceLikeGetter[K]{Getter: arg.Get}, nil
592604
case strings.HasPrefix(name, "DurationGetter"):
593605
arg, err := p.newGetter(argVal)
594606
if err != nil {

pkg/ottl/ottlfuncs/func_merge_maps.go

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ const (
2020

2121
type MergeMapsArguments[K any] struct {
2222
Target ottl.PMapGetter[K]
23-
Source ottl.PMapGetter[K]
23+
Source ottl.PMapSliceLikeGetter[K]
2424
Strategy string
2525
}
2626

@@ -44,7 +44,7 @@ func createMergeMapsFunction[K any](_ ottl.FunctionContext, oArgs ottl.Arguments
4444
// insert: Insert the value from `source` into `target` where the key does not already exist.
4545
// update: Update the entry in `target` with the value from `source` where the key does exist
4646
// upsert: Performs insert or update. Insert the value from `source` into `target` where the key does not already exist and update the entry in `target` with the value from `source` where the key does exist.
47-
func mergeMaps[K any](target ottl.PMapGetter[K], source ottl.PMapGetter[K], strategy string) (ottl.ExprFunc[K], error) {
47+
func mergeMaps[K any](target ottl.PMapGetter[K], source ottl.PMapSliceLikeGetter[K], strategy string) (ottl.ExprFunc[K], error) {
4848
if strategy != INSERT && strategy != UPDATE && strategy != UPSERT {
4949
return nil, fmt.Errorf("invalid value for strategy, %v, must be 'insert', 'update' or 'upsert'", strategy)
5050
}
@@ -54,35 +54,38 @@ func mergeMaps[K any](target ottl.PMapGetter[K], source ottl.PMapGetter[K], stra
5454
if err != nil {
5555
return nil, err
5656
}
57-
valueMap, err := source.Get(ctx, tCtx)
57+
valueMapSlice, err := source.Get(ctx, tCtx)
5858
if err != nil {
5959
return nil, err
6060
}
61-
switch strategy {
62-
case INSERT:
63-
valueMap.Range(func(k string, v pcommon.Value) bool {
64-
if _, ok := targetMap.Get(k); !ok {
61+
for _, valueMap := range valueMapSlice {
62+
switch strategy {
63+
case INSERT:
64+
valueMap.Range(func(k string, v pcommon.Value) bool {
65+
if _, ok := targetMap.Get(k); !ok {
66+
tv := targetMap.PutEmpty(k)
67+
v.CopyTo(tv)
68+
}
69+
return true
70+
})
71+
case UPDATE:
72+
valueMap.Range(func(k string, v pcommon.Value) bool {
73+
if tv, ok := targetMap.Get(k); ok {
74+
v.CopyTo(tv)
75+
}
76+
return true
77+
})
78+
case UPSERT:
79+
valueMap.Range(func(k string, v pcommon.Value) bool {
6580
tv := targetMap.PutEmpty(k)
6681
v.CopyTo(tv)
67-
}
68-
return true
69-
})
70-
case UPDATE:
71-
valueMap.Range(func(k string, v pcommon.Value) bool {
72-
if tv, ok := targetMap.Get(k); ok {
73-
v.CopyTo(tv)
74-
}
75-
return true
76-
})
77-
case UPSERT:
78-
valueMap.Range(func(k string, v pcommon.Value) bool {
79-
tv := targetMap.PutEmpty(k)
80-
v.CopyTo(tv)
81-
return true
82-
})
83-
default:
84-
return nil, fmt.Errorf("unknown strategy, %v", strategy)
82+
return true
83+
})
84+
default:
85+
return nil, fmt.Errorf("unknown strategy, %v", strategy)
86+
}
8587
}
88+
8689
return nil, nil
8790
}, nil
8891
}

pkg/ottl/ottlfuncs/func_merge_maps_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,17 @@ func Test_MergeMaps(t *testing.T) {
2525

2626
tests := []struct {
2727
name string
28-
source ottl.PMapGetter[pcommon.Map]
28+
source ottl.PMapSliceLikeGetter[pcommon.Map]
2929
strategy string
3030
want func(pcommon.Map)
3131
}{
3232
{
3333
name: "Upsert no conflicting keys",
34-
source: ottl.StandardPMapGetter[pcommon.Map]{
34+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
3535
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
3636
m := pcommon.NewMap()
3737
m.PutStr("attr2", "value2")
38-
return m, nil
38+
return []pcommon.Map{m}, nil
3939
},
4040
},
4141
strategy: UPSERT,
@@ -46,12 +46,12 @@ func Test_MergeMaps(t *testing.T) {
4646
},
4747
{
4848
name: "Upsert conflicting key",
49-
source: ottl.StandardPMapGetter[pcommon.Map]{
49+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
5050
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
5151
m := pcommon.NewMap()
5252
m.PutStr("attr1", "value3")
5353
m.PutStr("attr2", "value2")
54-
return m, nil
54+
return []pcommon.Map{m}, nil
5555
},
5656
},
5757
strategy: UPSERT,
@@ -62,11 +62,11 @@ func Test_MergeMaps(t *testing.T) {
6262
},
6363
{
6464
name: "Insert no conflicting keys",
65-
source: ottl.StandardPMapGetter[pcommon.Map]{
65+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
6666
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
6767
m := pcommon.NewMap()
6868
m.PutStr("attr2", "value2")
69-
return m, nil
69+
return []pcommon.Map{m}, nil
7070
},
7171
},
7272
strategy: INSERT,
@@ -77,12 +77,12 @@ func Test_MergeMaps(t *testing.T) {
7777
},
7878
{
7979
name: "Insert conflicting key",
80-
source: ottl.StandardPMapGetter[pcommon.Map]{
80+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
8181
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
8282
m := pcommon.NewMap()
8383
m.PutStr("attr1", "value3")
8484
m.PutStr("attr2", "value2")
85-
return m, nil
85+
return []pcommon.Map{m}, nil
8686
},
8787
},
8888
strategy: INSERT,
@@ -93,11 +93,11 @@ func Test_MergeMaps(t *testing.T) {
9393
},
9494
{
9595
name: "Update no conflicting keys",
96-
source: ottl.StandardPMapGetter[pcommon.Map]{
96+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
9797
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
9898
m := pcommon.NewMap()
9999
m.PutStr("attr2", "value2")
100-
return m, nil
100+
return []pcommon.Map{m}, nil
101101
},
102102
},
103103
strategy: UPDATE,
@@ -107,11 +107,11 @@ func Test_MergeMaps(t *testing.T) {
107107
},
108108
{
109109
name: "Update conflicting key",
110-
source: ottl.StandardPMapGetter[pcommon.Map]{
110+
source: ottl.StandardPMapSliceLikeGetter[pcommon.Map]{
111111
Getter: func(_ context.Context, _ pcommon.Map) (any, error) {
112112
m := pcommon.NewMap()
113113
m.PutStr("attr1", "value3")
114-
return m, nil
114+
return []pcommon.Map{m}, nil
115115
},
116116
},
117117
strategy: UPDATE,
@@ -141,7 +141,7 @@ func Test_MergeMaps(t *testing.T) {
141141
}
142142

143143
func Test_MergeMaps_bad_target(t *testing.T) {
144-
input := &ottl.StandardPMapGetter[any]{
144+
input := &ottl.StandardPMapSliceLikeGetter[any]{
145145
Getter: func(_ context.Context, tCtx any) (any, error) {
146146
return tCtx, nil
147147
},
@@ -159,7 +159,7 @@ func Test_MergeMaps_bad_target(t *testing.T) {
159159
}
160160

161161
func Test_MergeMaps_bad_input(t *testing.T) {
162-
input := &ottl.StandardPMapGetter[any]{
162+
input := &ottl.StandardPMapSliceLikeGetter[any]{
163163
Getter: func(_ context.Context, _ any) (any, error) {
164164
return 1, nil
165165
},

0 commit comments

Comments
 (0)