Skip to content

OTLP metrics sender interface #5691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import java.util.Map;

// intentionally not public while we incubate this concept
// if we want to use this in other registries, it should move to micrometer-core and become public API
interface MetricsSender {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just to demonstrate what this might look like if we try to expand its scope outside of the OTLP registry.


/**
* Send encoded metrics data from a {@link io.micrometer.core.instrument.MeterRegistry
* MeterRegistry}.
* @param metricsData encoded batch of metrics
* @param headers metadata to send as headers with the metrics data
*/
void send(byte[] metricsData, Map<String, String> headers);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think encoding the data should be the responsibility of the sender, e.g.: users should be able to provide a sender that uses json and another one that use protobuf. But we can discuss it and change it in the next milestone if this is valid.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Encoding and sending are separate concerns. The existing HttpSender does not handle encoding, and encoding is specific to the backend/format, whereas a sender is re-usable across registries as is. I had a version of changes at one point that introduced encoding generically in PushMeterRegistry, but it felt like too much abstraction and too wide scope for the purpose of achieving this in the OTLP registry. We could add it just for the OTLP registry, but I was aiming to make the least amount of necessary change to achieve the actual use cases we have feedback requesting, which I didn't see any asking for e.g. JSON support. Keeping things as implementation details makes it easier for us to support it as well.


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.ipc.http.HttpSender;

import java.util.Map;

/**
* An implementation of {@link OtlpMetricsSender} that uses an {@link HttpSender}.
*
* @since 1.15.0
*/
public class OtlpHttpMetricsSender implements OtlpMetricsSender {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(OtlpHttpMetricsSender.class);

// VisibleForTesting
final HttpSender httpSender;

private final OtlpConfig config;

private final String userAgentHeader;

public OtlpHttpMetricsSender(HttpSender httpSender, OtlpConfig config) {
this.httpSender = httpSender;
this.config = config;
this.userAgentHeader = getUserAgentHeader();
}

@Override
public void send(byte[] metricsData, Map<String, String> headers) {
HttpSender.Request.Builder httpRequest = this.httpSender.post(config.url())
.withHeader("User-Agent", userAgentHeader)
.withContent("application/x-protobuf", metricsData);
headers.forEach(httpRequest::withHeader);
try {
HttpSender.Response response = httpRequest.send();
if (!response.isSuccessful()) {
logger.warn(
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
getConfigurationContext(), response.code(), response.body());
}
}
catch (Throwable e) {
logger.warn("Failed to publish metrics (context: {}) ", getConfigurationContext(), e);
}
}

private String getUserAgentHeader() {
String userAgent = "Micrometer-OTLP-Exporter-Java";
String version = getClass().getPackage().getImplementationVersion();
if (version != null) {
userAgent += "/" + version;
}
return userAgent;
}

/**
* Get the configuration context.
* @return A message containing enough information for the log reader to figure out
* what configuration details may have contributed to the failure.
*/
private String getConfigurationContext() {
// While other values may contribute to failures, these two are most common
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import io.micrometer.common.lang.Nullable;
import io.micrometer.common.util.internal.logging.InternalLogger;
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.distribution.Histogram;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.internal.DefaultGauge;
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
Expand All @@ -36,14 +34,14 @@
import io.micrometer.core.instrument.util.MeterPartition;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.ipc.http.HttpSender;
import io.micrometer.core.ipc.http.HttpUrlConnectionSender;
import io.micrometer.registry.otlp.internal.CumulativeBase2ExponentialHistogram;
import io.micrometer.registry.otlp.internal.DeltaBase2ExponentialHistogram;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.*;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
import io.opentelemetry.proto.resource.v1.Resource;

import java.time.Duration;
Expand All @@ -56,8 +54,7 @@
import java.util.function.ToLongFunction;

/**
* Publishes meters in OTLP (OpenTelemetry Protocol) format. HTTP with Protobuf encoding
* is the only option currently supported.
* Publishes meters in OTLP (OpenTelemetry Protocol) format.
*
* @author Tommy Ludwig
* @author Lenin Jaganathan
Expand All @@ -83,16 +80,14 @@ public class OtlpMeterRegistry extends PushMeterRegistry {

private final OtlpConfig config;

private final HttpSender httpSender;
private final OtlpMetricsSender metricsSender;

private final Resource resource;

private final AggregationTemporality aggregationTemporality;

private final TimeUnit baseTimeUnit;

private final String userAgentHeader;

// Time when the last scheduled rollOver has started. Applicable only for delta
// flavour.
private volatile long lastMeterRolloverStartTime = -1;
Expand All @@ -109,31 +104,38 @@ public OtlpMeterRegistry(OtlpConfig config, Clock clock) {
}

/**
* Create an {@code OtlpMeterRegistry} instance.
* Create an {@code OtlpMeterRegistry} instance with an HTTP metrics sender.
* @param config config
* @param clock clock
* @param threadFactory thread factory
* @since 1.14.0
*/
public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory) {
this(config, clock, threadFactory, new HttpUrlConnectionSender());
this(config, clock, threadFactory, new OtlpHttpMetricsSender(new HttpUrlConnectionSender(), config));
}

// VisibleForTesting
// not public until we decide what we want to expose in public API
// HttpSender may not be a good idea if we will support a non-HTTP transport
OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory, HttpSender httpSender) {
private OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory,
OtlpMetricsSender metricsSender) {
super(config, clock);
this.config = config;
this.baseTimeUnit = config.baseTimeUnit();
this.httpSender = httpSender;
this.metricsSender = metricsSender;
this.resource = Resource.newBuilder().addAllAttributes(getResourceAttributes()).build();
this.aggregationTemporality = config.aggregationTemporality();
this.userAgentHeader = getUserAgentHeader();
config().namingConvention(NamingConvention.dot);
start(threadFactory);
}

/**
* Construct an {@link OtlpMeterRegistry} using the Builder pattern.
* @param config config for the registry; see {@link OtlpConfig#DEFAULT}
* @return builder
* @since 1.15.0
*/
public static Builder builder(OtlpConfig config) {
return new Builder(config);
}

@Override
public void start(ThreadFactory threadFactory) {
super.start(threadFactory);
Expand Down Expand Up @@ -178,34 +180,15 @@ protected void publish() {
.build())
.build())
.build();
HttpSender.Request.Builder httpRequest = this.httpSender.post(this.config.url())
.withHeader("User-Agent", this.userAgentHeader)
.withContent("application/x-protobuf", request.toByteArray());
this.config.headers().forEach(httpRequest::withHeader);
HttpSender.Response response = httpRequest.send();
if (!response.isSuccessful()) {
logger.warn(
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
getConfigurationContext(), response.code(), response.body());
}

metricsSender.send(request.toByteArray(), this.config.headers());
}
catch (Throwable e) {
logger.warn(String.format("Failed to publish metrics to OTLP receiver (context: %s)",
getConfigurationContext()), e);
logger.warn("Failed to publish metrics to OTLP receiver", e);
}
}
}

/**
* Get the configuration context.
* @return A message containing enough information for the log reader to figure out
* what configuration details may have contributed to the failure.
*/
private String getConfigurationContext() {
// While other values may contribute to failures, these two are most common
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
}

@Override
protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> valueFunction) {
return new DefaultGauge<>(id, obj, valueFunction);
Expand Down Expand Up @@ -492,13 +475,48 @@ static double[] getSloWithPositiveInf(DistributionStatisticConfig distributionSt
return sloWithPositiveInf;
}

private String getUserAgentHeader() {
String userAgent = "Micrometer-OTLP-Exporter-Java";
String version = getClass().getPackage().getImplementationVersion();
if (version != null) {
userAgent += "/" + version;
public static class Builder {

private final OtlpConfig otlpConfig;

private Clock clock = Clock.SYSTEM;

private ThreadFactory threadFactory = DEFAULT_THREAD_FACTORY;

private OtlpMetricsSender metricsSender;

private Builder(OtlpConfig otlpConfig) {
this.otlpConfig = otlpConfig;
this.metricsSender = new OtlpHttpMetricsSender(new HttpUrlConnectionSender(), otlpConfig);
}

/** Override the default clock. */
public Builder clock(Clock clock) {
this.clock = clock;
return this;
}
return userAgent;

/** Override the default {@link ThreadFactory}. */
public Builder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}

/**
* Provide your own custom metrics sender. This can be used to send OTLP metrics
* from OtlpMeterRegistry using different transports or clients than the default
* (HTTP using the HttpUrlConnectionSender). Encoding is in OTLP protobuf format.
* @see OtlpHttpMetricsSender
*/
public Builder metricsSender(OtlpMetricsSender metricsSender) {
this.metricsSender = metricsSender;
return this;
}

public OtlpMeterRegistry build() {
return new OtlpMeterRegistry(otlpConfig, clock, threadFactory, metricsSender);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.registry.otlp;

import java.util.Map;

/**
* This is responsible for sending OTLP format metrics to a compatible location. Specific
* implementations can use different transports or clients for sending the metrics.
*
* @since 1.15.0
*/
public interface OtlpMetricsSender extends MetricsSender {

/**
* Send a batch of OTLP Protobuf format metrics to a receiver.
* @param metricsData OTLP protobuf encoded batch of metrics
* @param headers metadata to send as headers with the metrics data
*/
@Override
void send(byte[] metricsData, Map<String, String> headers);

}
Loading