Skip to content

Commit 0a0abfa

Browse files
authored
stats/opentelemetry: Add CSM Plugin Option (#7205)
1 parent 2f52f9e commit 0a0abfa

File tree

9 files changed

+1005
-28
lines changed

9 files changed

+1005
-28
lines changed

.github/workflows/testing.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,15 @@ jobs:
105105
cd "${GITHUB_WORKSPACE}"
106106
for MOD_FILE in $(find . -name 'go.mod' | grep -Ev '^\./go\.mod'); do
107107
pushd "$(dirname ${MOD_FILE})"
108-
go test ${{ matrix.testflags }} -cpu 1,4 -timeout 2m ./...
108+
# Skip OpenTelemetry module if 3 releases ago, Go OpenTelemetry only supports
109+
# previous two releases.
110+
if [[ "${{ matrix.goversion }}" == "1.20" && "${PWD}" =~ /stats/opentelemetry$ ]]; then
111+
echo "Skipping ${MOD_FILE}"
112+
else
113+
go test ${{ matrix.testflags }} -cpu 1,4 -timeout 2m ./...
114+
fi
109115
popd
116+
110117
done
111118
112119
# Non-core gRPC tests (examples, interop, etc)
Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
*
3+
* Copyright 2024 gRPC authors.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
// Package csm contains utilities for Google Cloud Service Mesh observability.
20+
package csm
21+
22+
import (
23+
"context"
24+
"encoding/base64"
25+
"net/url"
26+
"os"
27+
"strings"
28+
29+
"google.golang.org/grpc/grpclog"
30+
"google.golang.org/grpc/internal/xds/bootstrap"
31+
"google.golang.org/grpc/metadata"
32+
"google.golang.org/grpc/stats/opentelemetry/internal"
33+
34+
"google.golang.org/protobuf/proto"
35+
"google.golang.org/protobuf/types/known/structpb"
36+
37+
"go.opentelemetry.io/contrib/detectors/gcp"
38+
"go.opentelemetry.io/otel/attribute"
39+
"go.opentelemetry.io/otel/sdk/resource"
40+
)
41+
42+
var logger = grpclog.Component("csm-observability-plugin")
43+
44+
// pluginOption emits CSM Labels from the environment and metadata exchange
45+
// for csm channels and all servers.
46+
//
47+
// Do not use this directly; use newPluginOption instead.
48+
type pluginOption struct {
49+
// localLabels are the labels that identify the local environment a binary
50+
// is run in, and will be emitted from the CSM Plugin Option.
51+
localLabels map[string]string
52+
// metadataExchangeLabelsEncoded are the metadata exchange labels to be sent
53+
// as the value of metadata key "x-envoy-peer-metadata" in proto wire format
54+
// and base 64 encoded. This gets sent out from all the servers running in
55+
// this process and for csm channels.
56+
metadataExchangeLabelsEncoded string
57+
}
58+
59+
// newPluginOption returns a new pluginOption with local labels and metadata
60+
// exchange labels derived from the environment.
61+
func newPluginOption(ctx context.Context) internal.PluginOption {
62+
localLabels, metadataExchangeLabelsEncoded := constructMetadataFromEnv(ctx)
63+
64+
return &pluginOption{
65+
localLabels: localLabels,
66+
metadataExchangeLabelsEncoded: metadataExchangeLabelsEncoded,
67+
}
68+
}
69+
70+
// NewLabelsMD returns a metadata.MD with the CSM labels as an encoded protobuf
71+
// Struct as the value of "x-envoy-peer-metadata".
72+
func (cpo *pluginOption) GetMetadata() metadata.MD {
73+
return metadata.New(map[string]string{
74+
metadataExchangeKey: cpo.metadataExchangeLabelsEncoded,
75+
})
76+
}
77+
78+
// GetLabels gets the CSM peer labels from the metadata provided. It returns
79+
// "unknown" for labels not found. Labels returned depend on the remote type.
80+
// Additionally, local labels determined at initialization time are appended to
81+
// labels returned, in addition to the optionalLabels provided.
82+
func (cpo *pluginOption) GetLabels(md metadata.MD) map[string]string {
83+
labels := map[string]string{ // Remote labels if type is unknown (i.e. unset or error processing x-envoy-peer-metadata)
84+
"csm.remote_workload_type": "unknown",
85+
"csm.remote_workload_canonical_service": "unknown",
86+
}
87+
// Append the local labels.
88+
for k, v := range cpo.localLabels {
89+
labels[k] = v
90+
}
91+
92+
val := md.Get("x-envoy-peer-metadata")
93+
// This can't happen if corresponding csm client because of proto wire
94+
// format encoding, but since it is arbitrary off the wire be safe.
95+
if len(val) != 1 {
96+
logger.Warningf("length of md values of \"x-envoy-peer-metadata\" is not 1, is %v", len(val))
97+
return labels
98+
}
99+
100+
protoWireFormat, err := base64.RawStdEncoding.DecodeString(val[0])
101+
if err != nil {
102+
logger.Warningf("error base 64 decoding value of \"x-envoy-peer-metadata\": %v", err)
103+
return labels
104+
}
105+
106+
spb := &structpb.Struct{}
107+
if err := proto.Unmarshal(protoWireFormat, spb); err != nil {
108+
logger.Warningf("error unmarshalling value of \"x-envoy-peer-metadata\" into proto: %v", err)
109+
return labels
110+
}
111+
112+
fields := spb.GetFields()
113+
114+
labels["csm.remote_workload_type"] = getFromMetadata("type", fields)
115+
// The value of “csm.remote_workload_canonical_service” comes from
116+
// MetadataExchange with the key “canonical_service”. (Note that this should
117+
// be read even if the remote type is unknown.)
118+
labels["csm.remote_workload_canonical_service"] = getFromMetadata("canonical_service", fields)
119+
120+
// Unset/unknown types, and types that aren't GKE or GCP return early with
121+
// just local labels, remote_workload_type and
122+
// remote_workload_canonical_service labels.
123+
workloadType := labels["csm.remote_workload_type"]
124+
if workloadType != "gcp_kubernetes_engine" && workloadType != "gcp_compute_engine" {
125+
return labels
126+
}
127+
// GKE and GCE labels.
128+
labels["csm.remote_workload_project_id"] = getFromMetadata("project_id", fields)
129+
labels["csm.remote_workload_location"] = getFromMetadata("location", fields)
130+
labels["csm.remote_workload_name"] = getFromMetadata("workload_name", fields)
131+
if workloadType == "gcp_compute_engine" {
132+
return labels
133+
}
134+
135+
// GKE only labels.
136+
labels["csm.remote_workload_cluster_name"] = getFromMetadata("cluster_name", fields)
137+
labels["csm.remote_workload_namespace_name"] = getFromMetadata("namespace_name", fields)
138+
return labels
139+
}
140+
141+
// getFromMetadata gets the value for the metadata key from the protobuf
142+
// metadata. Returns "unknown" if the metadata is not found in the protobuf
143+
// metadata, or if the value is not a string value. Returns the string value
144+
// otherwise.
145+
func getFromMetadata(metadataKey string, metadata map[string]*structpb.Value) string {
146+
if metadata != nil {
147+
if metadataVal, ok := metadata[metadataKey]; ok {
148+
if _, ok := metadataVal.GetKind().(*structpb.Value_StringValue); ok {
149+
return metadataVal.GetStringValue()
150+
}
151+
}
152+
}
153+
return "unknown"
154+
}
155+
156+
// getFromResource gets the value for the resource key from the attribute set.
157+
// Returns "unknown" if the resourceKey is not found in the attribute set or is
158+
// not a string value, the string value otherwise.
159+
func getFromResource(resourceKey attribute.Key, set *attribute.Set) string {
160+
if set != nil {
161+
if resourceVal, ok := set.Value(resourceKey); ok && resourceVal.Type() == attribute.STRING {
162+
return resourceVal.AsString()
163+
}
164+
}
165+
return "unknown"
166+
}
167+
168+
// getEnv returns "unknown" if environment variable is unset, the environment
169+
// variable otherwise.
170+
func getEnv(name string) string {
171+
if val, ok := os.LookupEnv(name); ok {
172+
return val
173+
}
174+
return "unknown"
175+
}
176+
177+
var (
178+
// This function will be overridden in unit tests.
179+
getAttrSetFromResourceDetector = func(ctx context.Context) *attribute.Set {
180+
r, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()))
181+
if err != nil {
182+
logger.Errorf("error reading OpenTelemetry resource: %v", err)
183+
return nil
184+
}
185+
return r.Set()
186+
}
187+
)
188+
189+
// constructMetadataFromEnv creates local labels and labels to send to the peer
190+
// using metadata exchange based off resource detection and environment
191+
// variables.
192+
//
193+
// Returns local labels, and base 64 encoded protobuf.Struct containing metadata
194+
// exchange labels.
195+
func constructMetadataFromEnv(ctx context.Context) (map[string]string, string) {
196+
set := getAttrSetFromResourceDetector(ctx)
197+
198+
labels := make(map[string]string)
199+
labels["type"] = getFromResource("cloud.platform", set)
200+
labels["canonical_service"] = getEnv("CSM_CANONICAL_SERVICE_NAME")
201+
202+
// If type is not GCE or GKE only metadata exchange labels are "type" and
203+
// "canonical_service".
204+
cloudPlatformVal := labels["type"]
205+
if cloudPlatformVal != "gcp_kubernetes_engine" && cloudPlatformVal != "gcp_compute_engine" {
206+
return initializeLocalAndMetadataLabels(labels)
207+
}
208+
209+
// GCE and GKE labels:
210+
labels["workload_name"] = getEnv("CSM_WORKLOAD_NAME")
211+
212+
locationVal := "unknown"
213+
if resourceVal, ok := set.Value("cloud.availability_zone"); ok && resourceVal.Type() == attribute.STRING {
214+
locationVal = resourceVal.AsString()
215+
} else if resourceVal, ok = set.Value("cloud.region"); ok && resourceVal.Type() == attribute.STRING {
216+
locationVal = resourceVal.AsString()
217+
}
218+
labels["location"] = locationVal
219+
220+
labels["project_id"] = getFromResource("cloud.account.id", set)
221+
if cloudPlatformVal == "gcp_compute_engine" {
222+
return initializeLocalAndMetadataLabels(labels)
223+
}
224+
225+
// GKE specific labels:
226+
labels["namespace_name"] = getFromResource("k8s.namespace.name", set)
227+
labels["cluster_name"] = getFromResource("k8s.cluster.name", set)
228+
return initializeLocalAndMetadataLabels(labels)
229+
}
230+
231+
// parseMeshIDString parses the mesh id from the node id according to the format
232+
// "projects/[GCP Project number]/networks/mesh:[Mesh ID]/nodes/[UUID]". Returns
233+
// "unknown" if there is a syntax error in the node ID.
234+
func parseMeshIDFromNodeID(nodeID string) string {
235+
meshSplit := strings.Split(nodeID, "/")
236+
if len(meshSplit) != 6 {
237+
return "unknown"
238+
}
239+
if meshSplit[0] != "projects" || meshSplit[2] != "networks" || meshSplit[4] != "nodes" {
240+
return "unknown"
241+
}
242+
meshID, ok := strings.CutPrefix(meshSplit[3], "mesh:")
243+
if !ok { // errors become "unknown"
244+
return "unknown"
245+
}
246+
return meshID
247+
}
248+
249+
// initializeLocalAndMetadataLabels csm local labels for a CSM Plugin Option to
250+
// record. It also builds out a base 64 encoded protobuf.Struct containing the
251+
// metadata exchange labels to be sent as part of metadata exchange from a CSM
252+
// Plugin Option.
253+
func initializeLocalAndMetadataLabels(labels map[string]string) (map[string]string, string) {
254+
// The value of “csm.workload_canonical_service” comes from
255+
// “CSM_CANONICAL_SERVICE_NAME” env var, “unknown” if unset.
256+
val := labels["canonical_service"]
257+
localLabels := make(map[string]string)
258+
localLabels["csm.workload_canonical_service"] = val
259+
// Get the CSM Mesh ID from the bootstrap file.
260+
nodeID := getNodeID()
261+
localLabels["csm.mesh_id"] = parseMeshIDFromNodeID(nodeID)
262+
263+
// Metadata exchange labels - can go ahead and encode into proto, and then
264+
// base64.
265+
pbLabels := &structpb.Struct{
266+
Fields: map[string]*structpb.Value{},
267+
}
268+
for k, v := range labels {
269+
pbLabels.Fields[k] = structpb.NewStringValue(v)
270+
}
271+
protoWireFormat, err := proto.Marshal(pbLabels)
272+
metadataExchangeLabelsEncoded := ""
273+
if err == nil {
274+
metadataExchangeLabelsEncoded = base64.RawStdEncoding.EncodeToString(protoWireFormat)
275+
}
276+
// else - This behavior triggers server side to reply (if sent from a gRPC
277+
// Client within this binary) with the metadata exchange labels. Even if
278+
// client side has a problem marshaling proto into wire format, it can
279+
// still use server labels so send an empty string as the value of
280+
// x-envoy-peer-metadata. The presence of this metadata exchange header
281+
// will cause server side to respond with metadata exchange labels.
282+
283+
return localLabels, metadataExchangeLabelsEncoded
284+
}
285+
286+
// getNodeID gets the Node ID from the bootstrap data.
287+
func getNodeID() string {
288+
cfg, err := bootstrap.NewConfig()
289+
if err != nil {
290+
return "" // will become "unknown"
291+
}
292+
if cfg.NodeProto == nil {
293+
return ""
294+
}
295+
return cfg.NodeProto.GetId()
296+
}
297+
298+
// metadataExchangeKey is the key for HTTP metadata exchange.
299+
const metadataExchangeKey = "x-envoy-peer-metadata"
300+
301+
func determineTargetCSM(parsedTarget *url.URL) bool {
302+
// On the client-side, the channel target is used to determine if a channel is a
303+
// CSM channel or not. CSM channels need to have an “xds” scheme and a
304+
// "traffic-director-global.xds.googleapis.com" authority. In the cases where no
305+
// authority is mentioned, the authority is assumed to be CSM. MetadataExchange
306+
// is performed only for CSM channels. Non-metadata exchange labels are detected
307+
// as described below.
308+
return parsedTarget.Scheme == "xds" && (parsedTarget.Host == "" || parsedTarget.Host == "traffic-director-global.xds.googleapis.com")
309+
}

0 commit comments

Comments
 (0)