Skip to content

feat: api transport metrics #4774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions pkg/config/controllerconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package config

import (
"context"
"fmt"
"net/http"

metricstransport "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/metrics/transport"
"golang.org/x/oauth2"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

httptransport?

"google.golang.org/grpc"
)

Expand All @@ -45,6 +49,9 @@ type ControllerConfig struct {
// GCPTokenSource mints OAuth2 tokens to be passed with GCP API calls,
// allowing use of a non-default OAuth2 identity
GCPTokenSource oauth2.TokenSource

// EnableMetricsTransport enables automatic wrapping of HTTP clients with metrics transport
EnableMetricsTransport bool
}

func (c *ControllerConfig) RESTClientOptions() ([]option.ClientOption, error) {
Expand All @@ -57,25 +64,37 @@ func (c *ControllerConfig) RESTClientOptions() ([]option.ClientOption, error) {
if c.UserAgent != "" {
opts = append(opts, option.WithUserAgent(c.UserAgent))
}

if c.HTTPClient != nil {
httpClient := &http.Client{}
*httpClient = *c.HTTPClient

transport := c.HTTPClient.Transport
if c.EnableMetricsTransport {
transport = metricstransport.NewMetricsTransport(transport)
}

httpClient.Transport = &optionsRoundTripper{
config: *c,
quotaProject: quotaProject,
inner: c.HTTPClient.Transport,
inner: transport,
}
opts = append(opts, option.WithHTTPClient(httpClient))

// quotaProject is incompatible with http client
quotaProject = ""
} else {
// the default HTTP client is used and wired up with Google auth

// we cannot pass both a custom http client and a token source to the Google transport
if c.GCPTokenSource != nil {
opts = append(opts, option.WithTokenSource(c.GCPTokenSource))
}
}

if quotaProject != "" {
opts = append(opts, option.WithQuotaProject(quotaProject))
}
if c.GCPTokenSource != nil {
opts = append(opts, option.WithTokenSource(c.GCPTokenSource))
}

// TODO: support endpoints?
// if m.config.Endpoint != "" {
Expand Down Expand Up @@ -123,3 +142,31 @@ func (m *optionsRoundTripper) RoundTrip(req *http.Request) (*http.Response, erro
}
return m.inner.RoundTrip(req)
}

// NewAuthenticatedHTTPClient creates an HTTP client with proper authentication
// and optionally wraps it with metrics transport
func (c *ControllerConfig) NewAuthenticatedHTTPClient(ctx context.Context) (*http.Client, error) {
opts, err := c.RESTClientOptions()
if err != nil {
return nil, fmt.Errorf("error creating REST client options: %w", err)
}
if c.HTTPClient != nil {
c, _, err := htransport.NewClient(ctx, opts...)
return c, err
}

// Create an authenticated transport using htransport
Copy link
Collaborator

@cheftako cheftako Jul 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment does not seem to align with the code in the immediate block.

baseTransport := http.DefaultTransport
if c.EnableMetricsTransport {
// Wrap the base transport with metrics transport
baseTransport = metricstransport.NewMetricsTransport(baseTransport)
}

// Create an authenticated transport
authTransport, err := htransport.NewTransport(ctx, baseTransport, opts...)
if err != nil {
return nil, fmt.Errorf("error creating authenticated transport: %w", err)
}

return &http.Client{Transport: authTransport}, nil
}
3 changes: 3 additions & 0 deletions pkg/controller/direct/directbase/directbase_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/execution"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/k8s"
metricstransport "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/metrics/transport"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/util"

"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -213,6 +214,8 @@ func (r *DirectReconciler) mapSecretToResources(ctx context.Context, obj client.

// Reconcile checks k8s for the current state of the resource.
func (r *DirectReconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, err error) {
ctx = metricstransport.WithControllerName(ctx, r.controllerName)

logger := log.FromContext(ctx)

logger.Info("Running reconcile", "resource", request.NamespacedName)
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/direct/logging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config"
api "google.golang.org/api/logging/v2"
"google.golang.org/api/option"
)

type gcpClient struct {
Expand All @@ -34,12 +35,12 @@ func newGCPClient(ctx context.Context, config *config.ControllerConfig) (*gcpCli
}

func (m *gcpClient) newProjectMetricsService(ctx context.Context) (*api.ProjectsMetricsService, error) {
opts, err := m.config.RESTClientOptions()
httpClient, err := m.config.NewAuthenticatedHTTPClient(ctx)
if err != nil {
return nil, err
}

service, err := api.NewService(ctx, opts...)
service, err := api.NewService(ctx, option.WithHTTPClient(httpClient))
if err != nil {
return nil, fmt.Errorf("building service for logging: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/kccmanager/kccmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type Config struct {
// UseCache is true if we should use the informer cache
// Currently only used in preview
UseCache bool

// EnableMetricsTransport enables automatic wrapping of HTTP clients with metrics transport
EnableMetricsTransport bool
}

// Creates a new controller-runtime manager.Manager and starts all of the KCC controllers pointed at the
Expand Down Expand Up @@ -124,6 +127,7 @@ func New(ctx context.Context, restConfig *rest.Config, cfg Config) (manager.Mana
tfCfg.UserProjectOverride = cfg.UserProjectOverride
tfCfg.BillingProject = cfg.BillingProject
tfCfg.GCPAccessToken = cfg.GCPAccessToken
tfCfg.EnableMetricsTransport = cfg.EnableMetricsTransport

provider, err := tfprovider.New(ctx, tfCfg)
if err != nil {
Expand All @@ -146,6 +150,7 @@ func New(ctx context.Context, restConfig *rest.Config, cfg Config) (manager.Mana
dclOptions.BillingProject = cfg.BillingProject
dclOptions.HTTPClient = cfg.HTTPClient
dclOptions.UserAgent = gcp.KCCUserAgent()
dclOptions.EnableMetricsTransport = cfg.EnableMetricsTransport

dclConfig, err := clientconfig.New(ctx, dclOptions)
if err != nil {
Expand All @@ -160,6 +165,7 @@ func New(ctx context.Context, restConfig *rest.Config, cfg Config) (manager.Mana
HTTPClient: cfg.HTTPClient,
GRPCUnaryClientInterceptor: cfg.GRPCUnaryClientInterceptor,
UserAgent: gcp.KCCUserAgent(),
EnableMetricsTransport: cfg.EnableMetricsTransport,
}

if cfg.GCPAccessToken != "" {
Expand Down
5 changes: 5 additions & 0 deletions pkg/dcl/clientconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/config"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/dcl/logger"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/gcp"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/metrics/transport"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/test"

"github.com/GoogleCloudPlatform/declarative-resource-client-library/dcl"
Expand All @@ -51,6 +52,10 @@ func newConfigAndClient(ctx context.Context, opt Options) (*dcl.Config, *http.Cl
opt.HTTPClient = httpClient
}

if opt.EnableMetricsTransport {
opt.HTTPClient.Transport = transport.NewMetricsTransport(opt.HTTPClient.Transport)
}

configOptions := []dcl.ConfigOption{
dcl.WithHTTPClient(opt.HTTPClient),
dcl.WithUserAgent(opt.UserAgent),
Expand Down
4 changes: 3 additions & 1 deletion pkg/metrics/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opencensus.io/stats/view"

"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/logging"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func RegisterControllerOpenCensusViews() error {
Expand All @@ -45,7 +46,8 @@ func RegisterPrometheusExporter(addr string) error {
// Run the Prometheus exporter as a scrape endpoint.
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", pe)
mux.Handle("/metrics", pe) // OpenCensus
mux.Handle("/experimental-metrics", promhttp.Handler()) // Prometheus Go client
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I grant its experimental right now but not sure that the path we want to use. How about /prom-metrics or /prometheus-metrics ?

if err := http.ListenAndServe(addr, mux); err != nil {
logging.Fatal(err, "failed to run Prometheus scrape endpoint")
}
Expand Down
163 changes: 163 additions & 0 deletions pkg/metrics/transport/transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package transport

import (
"context"
"net/http"
"strconv"
"strings"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"k8s.io/klog/v2"
)

var (
/*
Proof that all labels are bound:
- method: known set of HTTP methods: GET, PUT, etc.
- service: restricted by extractService
- status_code: known set of HTTP status codes: 2xx, 3xx, etc.
- controller_name: known set of controller names, manually configured.
*/
APILabels = []string{"method", "service", "status_code", "controller_name"}

gcpRequestsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "configconnector",
Name: "gcp_api_requests_total",
Help: "Total number of GCP API requests",
},
APILabels,
)

gcpRequestDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "configconnector",
Name: "gcp_api_request_duration_seconds",
Help: "Duration of GCP API requests in seconds",
Buckets: prometheus.DefBuckets,
},
APILabels,
)

gcpErrorsTotal = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "configconnector",
Name: "gcp_api_errors_total",
Help: "Total number of GCP API errors",
},
APILabels,
)
)

type contextKey string

const controllerNameContextKey contextKey = "controllerNameForMetrics"

// WithControllerName returns a new context with the controller name set
func WithControllerName(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, controllerNameContextKey, name)
}

// wrap an http.RoundTripper to records metrics for all http requests
type MetricsTransport struct {
inner http.RoundTripper
}

func NewMetricsTransport(inner http.RoundTripper) *MetricsTransport {
if inner == nil {
inner = http.DefaultTransport
}
return &MetricsTransport{inner: inner}
}

func (t *MetricsTransport) RoundTrip(req *http.Request) (*http.Response, error) {
service := extractService(req.URL.String())

start := time.Now()
resp, reqErr := t.inner.RoundTrip(req)
duration := time.Since(start).Seconds()

statusCode := "0" // good to have a default value here
if resp != nil {
statusCode = strconv.Itoa(resp.StatusCode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not bounded.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be going through bounded types and then converting to string at the WithLabelValues call to ensure the labels are actually bounded.

}
controllerName := "unknownControllerName"
if v := req.Context().Value(controllerNameContextKey); v != nil {
if s, ok := v.(string); ok {
controllerName = s
}
}

// record api request metrics
gcpRequestsTotal.WithLabelValues(req.Method, service, statusCode, controllerName).Inc()
gcpRequestDuration.WithLabelValues(req.Method, service, statusCode, controllerName).Observe(duration)

// Record errors
if reqErr != nil {
gcpErrorsTotal.WithLabelValues(req.Method, service, statusCode).Inc()

// no need to log this error as callers will deal with it
}

return resp, reqErr
}

// extractService extracts the GCP service from a URL into a well known, bounded set of values.
func extractService(url string) string {
/*
It is trivial to split by the "." separtor and grab the first part of the URL and call it a day.

However, we want to protect against XYZ.googleapis.com as an attack vector to blow up the cardinality
of prometheus metrics. So we "hardcode" the values here as a way to bound the set of allowable values.
*/
switch {
case strings.Contains(url, "compute.googleapis.com"):
return "compute"
case strings.Contains(url, "storage.googleapis.com"):
return "storage"
case strings.Contains(url, "bigquery.googleapis.com"):
return "bigquery"
case strings.Contains(url, "datacatalog.googleapis.com"):
return "datacatalog"
case strings.Contains(url, "logging.googleapis.com"):
return "logging"
case strings.Contains(url, "monitoring.googleapis.com"):
return "monitoring"
case strings.Contains(url, "iam.googleapis.com"):
return "iam"
case strings.Contains(url, "kms.googleapis.com"):
return "kms"
case strings.Contains(url, "pubsub.googleapis.com"):
return "pubsub"
case strings.Contains(url, "sqladmin.googleapis.com"):
return "sqladmin"
case strings.Contains(url, "container.googleapis.com"):
return "container"
case strings.Contains(url, "cloudresourcemanager.googleapis.com"):
return "cloudresourcemanager"
case strings.Contains(url, "dns.googleapis.com"):
return "dns"
case strings.Contains(url, "dataproc.googleapis.com"):
return "dataproc"
}

klog.Warningf("Unknown GCP service in URL: %s", url)
return "unknown"
}
Loading
Loading