Skip to content

stats/opentelemetry: Add CSM Plugin Option #7205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ jobs:

- type: tests
goversion: '1.20'
skipotel: true
steps:
# Setup the environment.
- name: Setup GOARCH
Expand Down Expand Up @@ -103,10 +104,18 @@ jobs:
go version
go test ${{ matrix.testflags }} -cpu 1,4 -timeout 7m google.golang.org/grpc/...
cd "${GITHUB_WORKSPACE}"
set -x
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just keep this in to enable debugging? I'll delete though.

for MOD_FILE in $(find . -name 'go.mod' | grep -Ev '^\./go\.mod'); do
pushd "$(dirname ${MOD_FILE})"
go test ${{ matrix.testflags }} -cpu 1,4 -timeout 2m ./...
# Skip OpenTelemetry module if 3 releases ago, Go OpenTelemetry only supports
# previous two releases.
if [[ "${{ matrix.skipotel }}" == "true" && "${PWD}" =~ /stats/opentelemetry$ ]]; then
echo "Skipping ${MOD_FILE}"
else
go test ${{ matrix.testflags }} -cpu 1,4 -timeout 2m ./...
fi
popd

done

# Non-core gRPC tests (examples, interop, etc)
Expand Down
52 changes: 14 additions & 38 deletions stats/opentelemetry/csm/pluginoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

// Package csm contains the implementation of the CSM Plugin Option.
// Package csm contains utilities for Google Cloud Service Mesh observability.
package csm

import (
Expand Down Expand Up @@ -79,7 +79,7 @@ func (cpo *pluginOption) GetMetadata() metadata.MD {
// "unknown" for labels not found. Labels returned depend on the remote type.
// Additionally, local labels determined at initialization time are appended to
// labels returned, in addition to the optionalLabels provided.
func (cpo *pluginOption) GetLabels(md metadata.MD, optionalLabels map[string]string) map[string]string {
func (cpo *pluginOption) GetLabels(md metadata.MD) map[string]string {
labels := map[string]string{ // Remote labels if type is unknown (i.e. unset or error processing x-envoy-peer-metadata)
"csm.remote_workload_type": "unknown",
"csm.remote_workload_canonical_service": "unknown",
Expand All @@ -89,12 +89,6 @@ func (cpo *pluginOption) GetLabels(md metadata.MD, optionalLabels map[string]str
labels[k] = v
}

// Append the optional labels. To avoid string comparisons, assume the
// caller only passes in two potential xDS Optional Labels: service_name and
// service_namespace.
for k, v := range optionalLabels {
labels[k] = v
}
val := md.Get("x-envoy-peer-metadata")
// This can't happen if corresponding csm client because of proto wire
// format encoding, but since it is arbitrary off the wire be safe.
Expand Down Expand Up @@ -149,53 +143,46 @@ func (cpo *pluginOption) GetLabels(md metadata.MD, optionalLabels map[string]str
// metadata, or if the value is not a string value. Returns the string value
// otherwise.
func getFromMetadata(metadataKey string, metadata map[string]*structpb.Value) string {
ret := "unknown"
if metadata != nil {
if metadataVal, ok := metadata[metadataKey]; ok {
if _, ok := metadataVal.GetKind().(*structpb.Value_StringValue); ok {
ret = metadataVal.GetStringValue()
return metadataVal.GetStringValue()
}
}
}
return ret
return "unknown"
}

// getFromResource gets the value for the resource key from the attribute set.
// Returns "unknown" if the resourceKey is not found in the attribute set or is
// not a string value, the string value otherwise.
func getFromResource(resourceKey attribute.Key, set *attribute.Set) string {
ret := "unknown"
if set != nil {
if resourceVal, ok := set.Value(resourceKey); ok && resourceVal.Type() == attribute.STRING {
ret = resourceVal.AsString()
return resourceVal.AsString()
}
}
return ret
return "unknown"
}

// getEnv returns "unknown" if environment variable is unset, the environment
// variable otherwise.
func getEnv(name string) string {
ret := "unknown"
if val, ok := os.LookupEnv(name); ok {
ret = val
return val
}
return ret
return "unknown"
}

var (
// This function will be overridden in unit tests.
getAttrSetFromResourceDetector = func(ctx context.Context) *attribute.Set {
r, err := resource.New(ctx, resource.WithDetectors(gcp.NewDetector()))

if err != nil {
logger.Errorf("error reading OpenTelemetry resource: %v", err)
return nil
}
var set *attribute.Set
if r != nil {
set = r.Set()
}
return set
return r.Set()
}
)

Expand Down Expand Up @@ -311,26 +298,15 @@ func getNodeID() string {
// metadataExchangeKey is the key for HTTP metadata exchange.
const metadataExchangeKey = "x-envoy-peer-metadata"

func determineTargetCSM(target string) bool {
func determineTargetCSM(parsedTarget *url.URL) bool {
// On the client-side, the channel target is used to determine if a channel is a
// CSM channel or not. CSM channels need to have an “xds” scheme and a
// "traffic-director-global.xds.googleapis.com" authority. In the cases where no
// authority is mentioned, the authority is assumed to be CSM. MetadataExchange
// is performed only for CSM channels. Non-metadata exchange labels are detected
// as described below.
parsedTarget, err := url.Parse(target)
if err != nil {
// Shouldn't happen as Dial would fail if target couldn't be parsed, but
// log just in case to inform user.
logger.Errorf("passed in target %v failed to parse: %v", parsedTarget, err)
return false
}

if parsedTarget.Scheme == "xds" {
if parsedTarget.Host == "" {
return true // "In the cases where no authority is mentioned, the authority is assumed to be csm"
}
return parsedTarget.Host == "traffic-director-global.xds.googleapis.com"
}
return false
// "In the cases where no authority is mentioned, the authority is assumed
// to be csm".
return parsedTarget.Scheme == "xds" && (parsedTarget.Host == "" || parsedTarget.Host == "traffic-director-global.xds.googleapis.com")
}
9 changes: 7 additions & 2 deletions stats/opentelemetry/csm/pluginoption_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/base64"
"fmt"
"net/url"
"os"
"testing"
"time"
Expand Down Expand Up @@ -252,7 +253,7 @@ func (s) TestGetLabels(t *testing.T) {
md.Append(metadataExchangeKey, "extra-val")
}

labelsGot := cpo.GetLabels(md, nil)
labelsGot := cpo.GetLabels(md)
if diff := cmp.Diff(labelsGot, test.labelsWant); diff != "" {
t.Fatalf("cpo.GetLabels returned unexpected value (-got, +want): %v", diff)
}
Expand Down Expand Up @@ -291,7 +292,11 @@ func (s) TestDetermineTargetCSM(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if got := determineTargetCSM(test.target); got != test.targetCSM {
parsedTarget, err := url.Parse(test.target)
if err != nil {
t.Fatalf("test target %v failed to parse: %v", test.target, err)
}
if got := determineTargetCSM(parsedTarget); got != test.targetCSM {
t.Fatalf("cpo.determineTargetCSM(%v): got %v, want %v", test.target, got, test.targetCSM)
}
})
Expand Down
11 changes: 5 additions & 6 deletions stats/opentelemetry/internal/pluginoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
// intended to be added to applicable OpenTelemetry metrics recorded in the
// OpenTelemetry instrumentation component.
//
// This API is experimental. In the future, we hope to stabilize and expose this
// API to allow plugins to inject labels of their choosing into metrics
// recorded.
// In the future, we hope to stabilize and expose this API to allow plugins to
// inject labels of their choosing into metrics recorded.
type PluginOption interface {
// GetMetadata creates a MD with metadata exchange labels.
GetMetadata() metadata.MD
// GetLabels emits relevant labels from the metadata provided and the
// optional labels provided, alongside any relevant labels.
GetLabels(metadata.MD, map[string]string) map[string]string
// GetLabels emits relevant labels from the incoming RPC metadata, alongside
// any relevant labels.
GetLabels(metadata.MD) map[string]string
}
10 changes: 6 additions & 4 deletions test/xds/xds_telemetry_labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (
)

const serviceNameKey = "service_name"
const serviceNameKeyCSM = "csm.service_name"
const serviceNamespaceKey = "service_namespace"
const serviceNamespaceKeyCSM = "csm.service_namespace"
const serviceNameValue = "grpc-service"
const serviceNamespaceValue = "grpc-service-namespace"

Expand Down Expand Up @@ -125,11 +127,11 @@ func (fsh *fakeStatsHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
// aren't started. All of these should have access to the desired telemetry
// labels.
case *stats.OutPayload, *stats.InPayload, *stats.End:
if label, ok := fsh.labels.TelemetryLabels[serviceNameKey]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKey, serviceNameValue, label)
if label, ok := fsh.labels.TelemetryLabels[serviceNameKeyCSM]; !ok || label != serviceNameValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNameKeyCSM, serviceNameValue, label)
}
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKey]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKey, serviceNamespaceValue, label)
if label, ok := fsh.labels.TelemetryLabels[serviceNamespaceKeyCSM]; !ok || label != serviceNamespaceValue {
fsh.t.Fatalf("for telemetry label %v, want: %v, got: %v", serviceNamespaceKeyCSM, serviceNamespaceValue, label)
}

default:
Expand Down
4 changes: 2 additions & 2 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ func validateClusterAndConstructClusterUpdate(cluster *v3clusterpb.Cluster) (Clu
if fields := val.GetFields(); fields != nil {
if val, ok := fields["service_name"]; ok {
if _, ok := val.GetKind().(*structpb.Value_StringValue); ok {
telemetryLabels["service_name"] = val.GetStringValue()
telemetryLabels["csm.service_name"] = val.GetStringValue()
}
}
if val, ok := fields["service_namespace"]; ok {
if _, ok := val.GetKind().(*structpb.Value_StringValue); ok {
telemetryLabels["service_namespace"] = val.GetStringValue()
telemetryLabels["csm.service_namespace"] = val.GetStringValue()
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/xdsclient/xdsresource/unmarshal_cds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1376,8 +1376,8 @@ func (s) TestUnmarshalCluster(t *testing.T) {
LRSServerConfig: ClusterLRSServerSelf,
Raw: v3ClusterAnyWithTelemetryLabels,
TelemetryLabels: map[string]string{
"service_name": "grpc-service",
"service_namespace": "grpc-service-namespace",
"csm.service_name": "grpc-service",
"csm.service_namespace": "grpc-service-namespace",
},
},
},
Expand All @@ -1391,7 +1391,7 @@ func (s) TestUnmarshalCluster(t *testing.T) {
LRSServerConfig: ClusterLRSServerSelf,
Raw: v3ClusterAnyWithTelemetryLabelsIgnoreSome,
TelemetryLabels: map[string]string{
"service_name": "grpc-service",
"csm.service_name": "grpc-service",
},
},
},
Expand Down
Loading