Skip to content

add client headers provider func #6362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions exporters/otlp/otlplog/otlploghttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,22 @@ func newHTTPClient(cfg config) (*client, error) {
req.Header.Set("Content-Type", "application/x-protobuf")

c := &httpClient{
compression: cfg.compression.Value,
req: req,
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
client: hc,
compression: cfg.compression.Value,
req: req,
requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate),
client: hc,
headersProvider: cfg.headersProvider.Value,
}
return &client{uploadLogs: c.uploadLogs}, nil
}

type httpClient struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
client *http.Client
req *http.Request
compression Compression
requestFunc retry.RequestFunc
client *http.Client
headersProvider HeadersProviderFunc
}

// Keep it in sync with golang's DefaultTransport from net/http! We
Expand Down Expand Up @@ -227,6 +229,14 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro
r := c.req.Clone(ctx)
req := request{Request: r}

headers, err := c.headersProvider()
if err != nil {
return req, fmt.Errorf("failed to execute headers provider: %w", err)
}
for k, v := range headers {
r.Header.Set(k, v)
}

switch c.compression {
case NoCompression:
r.ContentLength = (int64)(len(body))
Expand Down
22 changes: 22 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,28 @@ func TestConfig(t *testing.T) {
assert.Equal(t, []string{headers[key]}, got[key])
})

t.Run("WithHeadersProvider", func(t *testing.T) {
key := http.CanonicalHeaderKey("my-custom-header")
key2 := http.CanonicalHeaderKey("my-provided-custom-header")
headers := map[string]string{key: "custom-value"}
providedHeaders := map[string]string{key: "custom-value-override", key2: "provided-custom-value"}
headersProvider := func() (map[string]string, error) { return providedHeaders, nil }
exp, coll := factoryFunc("", nil, WithHeaders(headers), WithHeadersProvider(headersProvider))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
require.NoError(t, exp.Export(ctx, make([]log.Record, 1)))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

got := coll.Headers()
require.Regexp(t, "OTel Go OTLP over HTTP/protobuf logs exporter/[01]\\..*", got)
require.Contains(t, got, key)
// HeadersProviderFunc overrides Headers
assert.Equal(t, []string{providedHeaders[key]}, got[key])
// HeaderProviderFunc values merged with Headers
assert.Equal(t, []string{providedHeaders[key2]}, got[key2])
})

t.Run("WithTimeout", func(t *testing.T) {
// Do not send on rCh so the Collector never responds to the client.
rCh := make(chan exportResult)
Expand Down
43 changes: 29 additions & 14 deletions exporters/otlp/otlplog/otlploghttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import (

// Default values.
var (
defaultEndpoint = "localhost:4318"
defaultPath = "/v1/logs"
defaultTimeout = 10 * time.Second
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
defaultRetryCfg = retry.DefaultConfig
defaultEndpoint = "localhost:4318"
defaultPath = "/v1/logs"
defaultTimeout = 10 * time.Second
defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment
defaultRetryCfg = retry.DefaultConfig
defaultHeadersProvider HeadersProviderFunc = func() (map[string]string, error) { return map[string]string{}, nil }
)

// Environment variable keys.
Expand Down Expand Up @@ -85,15 +86,16 @@ type fnOpt func(config) config
func (f fnOpt) applyHTTPOption(c config) config { return f(c) }

type config struct {
endpoint setting[string]
path setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
compression setting[Compression]
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[retry.Config]
endpoint setting[string]
path setting[string]
insecure setting[bool]
tlsCfg setting[*tls.Config]
headers setting[map[string]string]
headersProvider setting[HeadersProviderFunc]
compression setting[Compression]
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[retry.Config]
}

func newConfig(options []Option) config {
Expand All @@ -117,6 +119,9 @@ func newConfig(options []Option) config {
c.tlsCfg = c.tlsCfg.Resolve(
loadEnvTLS[*tls.Config](),
)
c.headersProvider = c.headersProvider.Resolve(
fallback[HeadersProviderFunc](defaultHeadersProvider),
)
c.headers = c.headers.Resolve(
getenv[map[string]string](envHeaders, convHeaders),
)
Expand Down Expand Up @@ -286,6 +291,16 @@ func WithHeaders(headers map[string]string) Option {
})
}

type HeadersProviderFunc func() (map[string]string, error)

// WithHeadersProvider will be called to set the provided headers with each HTTP requests.
func WithHeadersProvider(providerFunc HeadersProviderFunc) Option {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not allow to e.g. to refresh token when getting a 401 error.

Copy link
Author

@kevingentile kevingentile Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HeadersProvider is executed with every call to newRequest. This should enable implementers to configure a HeadersProviderFunc that can hook into required token lifecycle components for each request. See oauth2/clientcredentials.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the access token is revoked? Do not we want to give the m the possibility to get a new one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s important to point out a constraint in the scope of this effort. These changes are motivated by the need to provide dynamically generated headers with requests, namely an Authorization header whose value may change over time as access tokens are renewed. The implementer is responsible for any scaffolding required to produce the desired headers (or error).
Here is a reference on the client credentials flow that you or others may find useful.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it’s important to point out a constraint in the scope of this effort.

What do you mean? Not gracefully supporting one of the possible oauth scenarios/flows?

I am not sure if it would not be more flexible and easier if we simply add an WithHTTPClient or WithHTTPTransport option (similarly to https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc#WithGRPCConn). Then you would be able to use https://pkg.go.dev/golang.org/x/oauth2/clientcredentials#Config.Client. WDYT? I think there was some prior art for it... Probably worth digging into some older issues and PRs

Copy link
Member

@pellared pellared Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

@kevingentile kevingentile Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here is to provide a way to set dynamic headers, not implement graceful OAuth2 support. Implementers can bring their own. For example:

package main

import (
	"context"
	"fmt"
	"net/url"
	"os"

	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
	"go.opentelemetry.io/otel/sdk/metric"
	"golang.org/x/oauth2/clientcredentials"
)

// NOTE: replace in go.mod:
// replace go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp => github.com/kevingentile/opentelemetry-go/exporters/otlp/otlpmetric/otlpmetrichttp v0.0.0-20250302224340-c40e5a3c2e22


func main() {
	ctx := context.Background()

	conf := clientcredentials.Config{
		ClientID:       os.Getenv("CLIENT_ID"),
		ClientSecret:   os.Getenv("CLIENT_SECRET"),
		TokenURL:       os.Getenv("TOKEN_URL"),
		EndpointParams: url.Values{"audience": {os.Getenv("AUDIENCE")}},
	}

	exp, err := otlpmetrichttp.New(ctx, otlpmetrichttp.WithHeadersProvider(func() (map[string]string, error) {
		token, err := conf.Token(ctx)
		if err != nil {
			return nil, fmt.Errorf("failed to get token: %w", err)
		}

		return map[string]string{
			"Authorization":       "Bearer " + token.AccessToken,
			"Some-Dynamic-Header": fmt.Sprint(os.Getpid()),
		}, nil
	}))
	if err != nil {
		panic(err)
	}

	meterProvider := metric.NewMeterProvider(metric.WithReader(metric.NewPeriodicReader(exp)))
	defer func() {
		if err := meterProvider.Shutdown(ctx); err != nil {
			panic(err)
		}
	}()
	otel.SetMeterProvider(meterProvider)
}

return fnOpt(func(c config) config {
c.headersProvider = newSetting(providerFunc)
return c
})
}

// WithTimeout sets the max amount of time an Exporter will attempt an export.
//
// This takes precedence over any retry settings defined by WithRetry. Once
Expand Down
23 changes: 22 additions & 1 deletion exporters/otlp/otlplog/otlploghttp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestNewConfig(t *testing.T) {
WithHeaders(headers),
WithTimeout(time.Second),
WithRetry(RetryConfig(rc)),
// Do not test WithProxy. Requires func comparison.
// Do not test WithProxy or WithHeaderProvider. Requires func comparison.
},
want: config{
endpoint: newSetting("test"),
Expand Down Expand Up @@ -395,6 +395,9 @@ func TestNewConfig(t *testing.T) {
// Cannot compare funcs, see TestWithProxy.
c.proxy = setting[HTTPTransportProxyFunc]{}

// Cannot compare funcs, see TestWithHeaderProvider
c.headersProvider = setting[HeadersProviderFunc]{}

assert.Equal(t, tc.want, c)

for _, errMsg := range tc.errs {
Expand Down Expand Up @@ -436,3 +439,21 @@ func TestWithProxy(t *testing.T) {
assert.True(t, c.proxy.Set)
assert.NotNil(t, c.proxy.Value)
}

func TestWithHeadersProvider(t *testing.T) {
key := "a"
expectedHeaders := map[string]string{key: "A"}
provider := func() (map[string]string, error) {
return expectedHeaders, nil
}
opts := []Option{WithHeadersProvider(provider)}
c := newConfig(opts)

assert.True(t, c.headersProvider.Set)
assert.NotNil(t, c.headersProvider.Value)

h, err := c.headersProvider.Value()

assert.Equal(t, h[key], expectedHeaders[key])
assert.Nil(t, err)
}
28 changes: 20 additions & 8 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (

type client struct {
// req is cloned for every upload the client makes.
req *http.Request
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
req *http.Request
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client
headersProvider HeadersProviderFunc
}

// Keep it in sync with golang's DefaultTransport from net/http! We
Expand Down Expand Up @@ -97,10 +98,11 @@ func newClient(cfg oconf.Config) (*client, error) {
req.Header.Set("Content-Type", "application/x-protobuf")

return &client{
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
headersProvider: HeadersProviderFunc(cfg.Metrics.HeadersProvider),
}, nil
}

Expand Down Expand Up @@ -230,6 +232,16 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) {
r := c.req.Clone(ctx)
req := request{Request: r}

if c.headersProvider != nil {
headers, err := c.headersProvider()
if err != nil {
return req, fmt.Errorf("failed to execute headers provider: %w", err)
}
for k, v := range headers {
r.Header.Set(k, v)
}
}

switch c.compression {
case NoCompression:
r.ContentLength = (int64)(len(body))
Expand Down
18 changes: 18 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ func TestConfig(t *testing.T) {
assert.Equal(t, []string{headers[key]}, got[key])
})

t.Run("WithHeadersProvider", func(t *testing.T) {
key := http.CanonicalHeaderKey("my-custom-header")
headers := map[string]string{key: "custom-value"}
exp, coll := factoryFunc("", nil, WithHeadersProvider(func() (map[string]string, error) {
return headers, nil
}))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

got := coll.Headers()
require.Regexp(t, "OTel Go OTLP over HTTP/protobuf metrics exporter/[01]\\..*", got)
require.Contains(t, got, key)
assert.Equal(t, []string{headers[key]}, got[key])
})

t.Run("WithTimeout", func(t *testing.T) {
// Do not send on rCh so the Collector never responds to the client.
rCh := make(chan otest.ExportResult)
Expand Down
8 changes: 8 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Compression oconf.Compression
// to the OTLP HTTP client.
type HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)

// HeadersProviderFunc is a function which resolves to the headers to use for a given request.
type HeadersProviderFunc func() (map[string]string, error)

const (
// NoCompression tells the driver to send payloads without
// compression.
Expand Down Expand Up @@ -167,6 +170,11 @@ func WithHeaders(headers map[string]string) Option {
return wrappedOption{oconf.WithHeaders(headers)}
}

// WithHeadersProvider will be called to set the provided headers with each HTTP requests.
func WithHeadersProvider(providerFunc HeadersProviderFunc) Option {
return wrappedOption{oconf.WithHeadersProvider(oconf.HeadersProviderFunc(providerFunc))}
}

// WithTimeout sets the max amount of time an Exporter will attempt an export.
//
// This takes precedence over any retry settings defined by WithRetry. Once
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,18 @@ type (
// This type is compatible with `http.Transport.Proxy` and can be used to set a custom proxy function to the OTLP HTTP client.
HTTPTransportProxyFunc func(*http.Request) (*url.URL, error)

// HeadersProviderFunc is a function which resolves to the headers to use for a given request.
HeadersProviderFunc func() (map[string]string, error)

SignalConfig struct {
Endpoint string
Insecure bool
TLSCfg *tls.Config
Headers map[string]string
Compression Compression
Timeout time.Duration
URLPath string
Endpoint string
Insecure bool
TLSCfg *tls.Config
Headers map[string]string
HeadersProvider HeadersProviderFunc
Compression Compression
Timeout time.Duration
URLPath string

// gRPC configurations
GRPCCredentials credentials.TransportCredentials
Expand Down Expand Up @@ -345,6 +349,13 @@ func WithHeaders(headers map[string]string) GenericOption {
})
}

func WithHeadersProvider(hpf HeadersProviderFunc) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.HeadersProvider = hpf
return cfg
})
}

func WithTimeout(duration time.Duration) GenericOption {
return newGenericOption(func(cfg Config) Config {
cfg.Metrics.Timeout = duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,44 @@ func TestConfigs(t *testing.T) {
},
},

// Headers Provider tests
{
name: " Test with Headers Provider",
opts: []GenericOption{
WithHeadersProvider(func() (map[string]string, error) {
return map[string]string{"key": "value"}, nil
}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.NotNil(t, c.Metrics.HeadersProvider)
headers, err := c.Metrics.HeadersProvider()
assert.NoError(t, err)
assert.Equal(t, map[string]string{"key": "value"}, headers)
},
},
{
name: " Test with Headers and Headers Provider",
opts: []GenericOption{
WithHeaders(map[string]string{"key": "value"}),
WithHeadersProvider(func() (map[string]string, error) {
return map[string]string{"key": "value-override"}, nil
}),
},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.NotNil(t, c.Metrics.HeadersProvider)
headers, err := c.Metrics.HeadersProvider()
assert.NoError(t, err)
assert.Equal(t, map[string]string{"key": "value-override"}, headers)
},
},
{
name: " Test without Headers Provider",
opts: []GenericOption{},
asserts: func(t *testing.T, c *Config, grpcOption bool) {
assert.Nil(t, c.Metrics.HeadersProvider)
},
},

// Compression Tests
{
name: "Test With Compression",
Expand Down
10 changes: 10 additions & 0 deletions exporters/otlp/otlptrace/otlptracehttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ func (d *client) newRequest(body []byte) (request, error) {
for k, v := range d.cfg.Headers {
r.Header.Set(k, v)
}

if d.cfg.HeadersProvider != nil {
headers, err := d.cfg.HeadersProvider()
if err != nil {
return request{Request: r}, fmt.Errorf("failed to execute headers provider: %w", err)
}
for k, v := range headers {
r.Header.Set(k, v)
}
}
r.Header.Set("Content-Type", contentTypeProto)

req := request{Request: r}
Expand Down
Loading