Skip to content

Commit 8225fbf

Browse files
committed
Add context propagation support
In the current implementation, weaver doesn't allow the user to propagate context information. We recommend users to define a struct that encapsulates the metadata information and add it as an argument to the method. However, more and more users are asking for an option to propagate metadata information using the context. This request comes especially from users that are using gRPC to communicate between their services, and gRPC provides a way to propagate metadata information using the context. This PR enables the users to propagate metadata information as a map[string]string. ```main.go // To attach metadata with key "foo" and value "bar" to the context, you can do: ctx := context.Background() ctx = weavermetadata.NewContext(ctx, map[string]string{"foo": "bar"}) // To read the metadata value associated with a key "foo" in the context, you can do: meta, found := weavermetadata.FromContext(ctx) if found { value := meta["foo"] } ``` [1] https://pkg.go.dev/google.golang.org/grpc/metadata
1 parent a89085a commit 8225fbf

File tree

9 files changed

+568
-36
lines changed

9 files changed

+568
-36
lines changed

godeps.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ github.com/ServiceWeaver/weaver/internal/net/call
335335
github.com/ServiceWeaver/weaver/runtime/codegen
336336
github.com/ServiceWeaver/weaver/runtime/logging
337337
github.com/ServiceWeaver/weaver/runtime/retry
338+
github.com/ServiceWeaver/weaver/weavermetadata
338339
go.opentelemetry.io/otel/codes
339340
go.opentelemetry.io/otel/trace
340341
io
@@ -983,6 +984,9 @@ github.com/ServiceWeaver/weaver/sim/internal/bank
983984
go.opentelemetry.io/otel/codes
984985
go.opentelemetry.io/otel/trace
985986
reflect
987+
github.com/ServiceWeaver/weaver/weavermetadata
988+
context
989+
strings
986990
github.com/ServiceWeaver/weaver/weavertest
987991
context
988992
errors
@@ -1064,8 +1068,10 @@ github.com/ServiceWeaver/weaver/weavertest/internal/simple
10641068
fmt
10651069
github.com/ServiceWeaver/weaver
10661070
github.com/ServiceWeaver/weaver/runtime/codegen
1071+
github.com/ServiceWeaver/weaver/weavermetadata
10671072
go.opentelemetry.io/otel/codes
10681073
go.opentelemetry.io/otel/trace
1074+
golang.org/x/exp/maps
10691075
net/http
10701076
os
10711077
reflect

internal/net/call/call.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,13 @@ import (
8080

8181
"github.com/ServiceWeaver/weaver/runtime/logging"
8282
"github.com/ServiceWeaver/weaver/runtime/retry"
83+
"github.com/ServiceWeaver/weaver/weavermetadata"
8384
"go.opentelemetry.io/otel/codes"
8485
"go.opentelemetry.io/otel/trace"
8586
)
8687

87-
const (
88-
// Size of the header included in each message.
89-
msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context
90-
)
88+
// Size of the header included in each message.
89+
const msgHeaderSize = 16 + 8 + traceHeaderLen + metadataHeaderLen
9190

9291
// Connection allows a client to send RPCs.
9392
type Connection interface {
@@ -405,6 +404,18 @@ func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg
405404
// Send trace information in the header.
406405
writeTraceContext(ctx, hdr[24:])
407406

407+
// Send context metadata in the header.
408+
var meta []byte
409+
m, found := weavermetadata.FromContext(ctx)
410+
if found {
411+
meta = writeContextMetadata(m)
412+
}
413+
binary.LittleEndian.PutUint64(hdr[49:], uint64(len(meta)))
414+
hdrSlice := hdr[:]
415+
if len(meta) > 0 {
416+
hdrSlice = append(hdrSlice, meta...)
417+
}
418+
408419
rpc := &call{}
409420
rpc.doneSignal = make(chan struct{})
410421

@@ -413,7 +424,7 @@ func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg
413424
if err != nil {
414425
return nil, err
415426
}
416-
if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdr[:], arg, rc.opts.WriteFlattenLimit); err != nil {
427+
if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdrSlice, arg, rc.opts.WriteFlattenLimit); err != nil {
417428
conn.shutdown("client send request", err)
418429
conn.endCall(rpc)
419430
return nil, fmt.Errorf("%w: %s", CommunicationError, err)
@@ -984,8 +995,15 @@ func (c *serverConnection) runHandler(hmap *HandlerMap, id uint64, msg []byte) {
984995
}
985996
}()
986997

998+
// Extract metadata context information if any.
999+
metaLen := binary.LittleEndian.Uint64(msg[49:])
1000+
if metaLen > 0 {
1001+
meta := readContextMetadata(msg[msgHeaderSize : msgHeaderSize+metaLen])
1002+
ctx = weavermetadata.NewContext(ctx, meta)
1003+
}
1004+
9871005
// Call the handler passing it the payload.
988-
payload := msg[msgHeaderSize:]
1006+
payload := msg[msgHeaderSize+metaLen:]
9891007
var err error
9901008
var result []byte
9911009
fn, ok := hmap.handlers[hkey]

internal/net/call/metadata.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package call
16+
17+
import (
18+
"strings"
19+
20+
"github.com/ServiceWeaver/weaver/runtime/codegen"
21+
)
22+
23+
const metadataHeaderLen = 8 // metadata length
24+
25+
// writeContextMetadata serializes the context metadata (if any).
26+
func writeContextMetadata(meta map[string]string) []byte {
27+
enc := codegen.NewEncoder()
28+
enc.Len(len(meta))
29+
for k, v := range meta {
30+
enc.String(strings.ToLower(k))
31+
enc.String(v)
32+
}
33+
return enc.Data()
34+
}
35+
36+
// readContextMetadata returns the context metadata (if any).
37+
func readContextMetadata(meta []byte) map[string]string {
38+
dec := codegen.NewDecoder(meta)
39+
n := dec.Len()
40+
res := make(map[string]string, n)
41+
var k, v string
42+
for i := 0; i < n; i++ {
43+
k = strings.ToLower(dec.String())
44+
v = dec.String()
45+
res[k] = v
46+
}
47+
return res
48+
}

internal/net/call/trace.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
"go.opentelemetry.io/otel/trace"
2121
)
2222

23-
const traceHeaderLen = 25
23+
const traceHeaderLen = 25 // handler_key + deadline + trace_context
2424

2525
// writeTraceContext serializes the trace context (if any) contained in ctx
2626
// into b.

weavermetadata/metadata.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package weavermetadata define the structure of the metadata supported by weaver.
16+
//
17+
// Note that we allow metadata information to propagate as a map, where the keys
18+
// and the values are strings. It the user wants to propagate values that are
19+
// more complicated types, it's their responsibility to encode/decode these types
20+
// to/from string values.
21+
//
22+
// Note that all keys are automatically lowercase. This ensures the user avoids
23+
// mistakes when creating and retrieving values for the same key. E.g., the user
24+
// can create a map with the key "Foo" but try to retrieve the value for the
25+
// key "foo".
26+
//
27+
// How to:
28+
//
29+
// To attach metadata with key "foo" and value "bar" to the context, you can do:
30+
//
31+
// ctx := context.Background()
32+
// ctx = weavermetadata.NewContext(ctx, map[string]string{"foo": "bar"})
33+
//
34+
// To read the metadata value associated with a key "foo" in the context, you can do:
35+
//
36+
// meta, found := weavermetadata.FromContext(ctx)
37+
// if found {
38+
// value := meta["foo"]
39+
// }
40+
package weavermetadata
41+
42+
import (
43+
"context"
44+
"strings"
45+
)
46+
47+
// metaKey is an unexported type for the key that stores the metadata.
48+
type metaKey struct{}
49+
50+
// NewContext returns a new context that carries metadata meta.
51+
func NewContext(ctx context.Context, meta map[string]string) context.Context {
52+
return context.WithValue(ctx, metaKey{}, meta)
53+
}
54+
55+
// FromContext returns the metadata value stored in ctx, if any.
56+
func FromContext(ctx context.Context) (map[string]string, bool) {
57+
meta, ok := ctx.Value(metaKey{}).(map[string]string)
58+
if !ok {
59+
return nil, false
60+
}
61+
out := make(map[string]string, len(meta))
62+
for k, v := range meta {
63+
key := strings.ToLower(k)
64+
out[key] = v
65+
}
66+
return out, true
67+
}

weavermetadata/metadata_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package weavermetadata
16+
17+
import (
18+
"context"
19+
"reflect"
20+
"testing"
21+
)
22+
23+
func TestContextMetadata(t *testing.T) {
24+
type testCase struct {
25+
name string
26+
meta map[string]string
27+
isMetaExpected bool
28+
want map[string]string
29+
}
30+
for _, test := range []testCase{
31+
{
32+
name: "no metadata",
33+
},
34+
{
35+
name: "with empty metadata",
36+
meta: map[string]string{},
37+
isMetaExpected: false,
38+
},
39+
{
40+
name: "with valid metadata",
41+
meta: map[string]string{
42+
"foo": "bar",
43+
"baz": "waldo",
44+
},
45+
isMetaExpected: true,
46+
want: map[string]string{
47+
"foo": "bar",
48+
"baz": "waldo",
49+
},
50+
},
51+
{
52+
name: "with valid metadata and uppercase keys",
53+
meta: map[string]string{
54+
"Foo": "bar",
55+
"Baz": "waldo",
56+
},
57+
isMetaExpected: true,
58+
want: map[string]string{
59+
"foo": "bar",
60+
"baz": "waldo",
61+
},
62+
},
63+
{
64+
name: "with valid metadata and uppercase values",
65+
meta: map[string]string{
66+
"Foo": "Bar",
67+
"Baz": "Waldo",
68+
},
69+
isMetaExpected: true,
70+
want: map[string]string{
71+
"foo": "Bar",
72+
"baz": "Waldo",
73+
},
74+
},
75+
} {
76+
t.Run(test.name, func(t *testing.T) {
77+
ctx := context.Background()
78+
if len(test.meta) > 0 {
79+
ctx = NewContext(ctx, test.meta)
80+
}
81+
82+
got, found := FromContext(ctx)
83+
if !reflect.DeepEqual(found, test.isMetaExpected) {
84+
t.Errorf("ExtractMetadata: expecting %v, got %v", test.isMetaExpected, found)
85+
}
86+
if !found {
87+
return
88+
}
89+
if !reflect.DeepEqual(test.want, got) {
90+
t.Errorf("ExtractMetadata: expecting %v, got %v", test.want, got)
91+
}
92+
})
93+
}
94+
}

weavertest/internal/simple/simple.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ import (
2828
"sync"
2929

3030
"github.com/ServiceWeaver/weaver"
31+
"github.com/ServiceWeaver/weaver/weavermetadata"
32+
"golang.org/x/exp/maps"
3133
)
3234

3335
//go:generate ../../../cmd/weaver/weaver generate
@@ -50,6 +52,8 @@ type Destination interface {
5052
Record(_ context.Context, file, msg string) error
5153
GetAll(_ context.Context, file string) ([]string, error)
5254
RoutedRecord(_ context.Context, file, msg string) error
55+
UpdateMetadata(_ context.Context) error
56+
GetMetadata(_ context.Context) (map[string]string, error)
5357
}
5458

5559
var (
@@ -67,7 +71,8 @@ func (r destRouter) RoutedRecord(_ context.Context, file, msg string) string {
6771
type destination struct {
6872
weaver.Implements[Destination]
6973
weaver.WithRouter[destRouter]
70-
mu sync.Mutex
74+
mu sync.Mutex
75+
metadata map[string]string
7176
}
7277

7378
var pid = os.Getpid()
@@ -77,7 +82,7 @@ func (d *destination) Init(ctx context.Context) error {
7782
return nil
7883
}
7984

80-
func (d *destination) Getpid(_ context.Context) (int, error) {
85+
func (d *destination) Getpid(context.Context) (int, error) {
8186
return pid, nil
8287
}
8388

@@ -113,6 +118,22 @@ func (d *destination) GetAll(_ context.Context, file string) ([]string, error) {
113118
return strings.Split(str, "\n"), nil
114119
}
115120

121+
func (d *destination) UpdateMetadata(ctx context.Context) error {
122+
d.mu.Lock()
123+
defer d.mu.Unlock()
124+
meta, found := weavermetadata.FromContext(ctx)
125+
if found {
126+
d.metadata = maps.Clone(meta)
127+
}
128+
return nil
129+
}
130+
131+
func (d *destination) GetMetadata(_ context.Context) (map[string]string, error) {
132+
d.mu.Lock()
133+
defer d.mu.Unlock()
134+
return d.metadata, nil
135+
}
136+
116137
// Server is a component used to test Service Weaver listener handling.
117138
// An HTTP server is started when this component is initialized.
118139
// simple_test.go checks the functionality of the HTTP server by fetching

0 commit comments

Comments
 (0)