Skip to content

Commit 327e017

Browse files
Add capability for anyone to define their implementation of OTLP metrics sending
1 parent fe5df35 commit 327e017

File tree

4 files changed

+150
-73
lines changed

4 files changed

+150
-73
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.registry.otlp;
17+
18+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
19+
20+
public interface OltpMetricsSender {
21+
22+
void send(ExportMetricsServiceRequest request);
23+
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.registry.otlp;
17+
18+
import io.micrometer.common.util.internal.logging.InternalLogger;
19+
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
20+
import io.micrometer.core.ipc.http.HttpSender;
21+
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
22+
23+
public class OtlpHttpMetricsSender implements OltpMetricsSender {
24+
25+
private static final InternalLogger logger = InternalLoggerFactory.getInstance(OtlpHttpMetricsSender.class);
26+
27+
// VisibleForTesting
28+
final HttpSender httpSender;
29+
30+
private final OtlpConfig config;
31+
32+
private final String userAgentHeader;
33+
34+
public OtlpHttpMetricsSender(HttpSender httpSender, OtlpConfig config) {
35+
this.httpSender = httpSender;
36+
this.config = config;
37+
this.userAgentHeader = getUserAgentHeader();
38+
}
39+
40+
@Override
41+
public void send(ExportMetricsServiceRequest request) {
42+
HttpSender.Request.Builder httpRequest = this.httpSender.post(config.url())
43+
.withHeader("User-Agent", userAgentHeader)
44+
.withContent("application/x-protobuf", request.toByteArray());
45+
config.headers().forEach(httpRequest::withHeader);
46+
try {
47+
HttpSender.Response response = httpRequest.send();
48+
if (!response.isSuccessful()) {
49+
logger.warn(
50+
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
51+
getConfigurationContext(), response.code(), response.body());
52+
}
53+
}
54+
catch (Throwable e) {
55+
logger.warn("Failed to publish metrics (context: {}) ", getConfigurationContext(), e);
56+
}
57+
}
58+
59+
private String getUserAgentHeader() {
60+
String plainExporter = "Micrometer-OTLP-Exporter-Java";
61+
if (OtlpMeterRegistry.class.getPackage().getImplementationVersion() == null) {
62+
return plainExporter;
63+
}
64+
return plainExporter + "/" + OtlpMeterRegistry.class.getPackage().getImplementationVersion();
65+
}
66+
67+
/**
68+
* Get the configuration context.
69+
* @return A message containing enough information for the log reader to figure out
70+
* what configuration details may have contributed to the failure.
71+
*/
72+
private String getConfigurationContext() {
73+
// While other values may contribute to failures, these two are most common
74+
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
75+
}
76+
77+
}

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java

Lines changed: 38 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
import io.micrometer.common.lang.Nullable;
1919
import io.micrometer.common.util.internal.logging.InternalLogger;
2020
import io.micrometer.common.util.internal.logging.InternalLoggerFactory;
21-
import io.micrometer.core.instrument.Gauge;
22-
import io.micrometer.core.instrument.*;
2321
import io.micrometer.core.instrument.Timer;
22+
import io.micrometer.core.instrument.*;
2423
import io.micrometer.core.instrument.config.NamingConvention;
2524
import io.micrometer.core.instrument.distribution.*;
26-
import io.micrometer.core.instrument.distribution.Histogram;
2725
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
2826
import io.micrometer.core.instrument.internal.DefaultGauge;
2927
import io.micrometer.core.instrument.internal.DefaultLongTaskTimer;
@@ -36,14 +34,14 @@
3634
import io.micrometer.core.instrument.util.MeterPartition;
3735
import io.micrometer.core.instrument.util.NamedThreadFactory;
3836
import io.micrometer.core.instrument.util.TimeUtils;
39-
import io.micrometer.core.ipc.http.HttpSender;
4037
import io.micrometer.core.ipc.http.HttpUrlConnectionSender;
4138
import io.micrometer.registry.otlp.internal.CumulativeBase2ExponentialHistogram;
4239
import io.micrometer.registry.otlp.internal.DeltaBase2ExponentialHistogram;
4340
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
4441
import io.opentelemetry.proto.common.v1.AnyValue;
4542
import io.opentelemetry.proto.common.v1.KeyValue;
46-
import io.opentelemetry.proto.metrics.v1.*;
43+
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
44+
import io.opentelemetry.proto.metrics.v1.ScopeMetrics;
4745
import io.opentelemetry.proto.resource.v1.Resource;
4846

4947
import java.time.Duration;
@@ -56,8 +54,7 @@
5654
import java.util.function.ToLongFunction;
5755

5856
/**
59-
* Publishes meters in OTLP (OpenTelemetry Protocol) format. HTTP with Protobuf encoding
60-
* is the only option currently supported.
57+
* Publishes meters in OTLP (OpenTelemetry Protocol) format.
6158
*
6259
* @author Tommy Ludwig
6360
* @author Lenin Jaganathan
@@ -83,16 +80,14 @@ public class OtlpMeterRegistry extends PushMeterRegistry {
8380

8481
private final OtlpConfig config;
8582

86-
private final HttpSender httpSender;
83+
private final OltpMetricsSender metricsSender;
8784

8885
private final Resource resource;
8986

9087
private final AggregationTemporality aggregationTemporality;
9188

9289
private final TimeUnit baseTimeUnit;
9390

94-
private final String userAgentHeader;
95-
9691
// Time when the last scheduled rollOver has started. Applicable only for delta
9792
// flavour.
9893
private volatile long lastMeterRolloverStartTime = -1;
@@ -109,27 +104,32 @@ public OtlpMeterRegistry(OtlpConfig config, Clock clock) {
109104
}
110105

111106
/**
112-
* Create an {@code OtlpMeterRegistry} instance.
107+
* Create an {@code OtlpMeterRegistry} instance with an HTTP metrics sender.
113108
* @param config config
114109
* @param clock clock
115110
* @param threadFactory thread factory
116111
* @since 1.14.0
117112
*/
118113
public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory) {
119-
this(config, clock, threadFactory, new HttpUrlConnectionSender());
114+
this(config, clock, threadFactory, new OtlpHttpMetricsSender(new HttpUrlConnectionSender(), config));
120115
}
121116

122-
// VisibleForTesting
123-
// not public until we decide what we want to expose in public API
124-
// HttpSender may not be a good idea if we will support a non-HTTP transport
125-
OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory, HttpSender httpSender) {
117+
/**
118+
* Create an {@code OtlpMeterRegistry} instance.
119+
* @param config config
120+
* @param clock clock
121+
* @param threadFactory thread factory
122+
* @param metricsSender metrics sender
123+
* @since 1.14.0
124+
*/
125+
public OtlpMeterRegistry(OtlpConfig config, Clock clock, ThreadFactory threadFactory,
126+
OltpMetricsSender metricsSender) {
126127
super(config, clock);
127128
this.config = config;
128129
this.baseTimeUnit = config.baseTimeUnit();
129-
this.httpSender = httpSender;
130+
this.metricsSender = metricsSender;
130131
this.resource = Resource.newBuilder().addAllAttributes(getResourceAttributes()).build();
131132
this.aggregationTemporality = config.aggregationTemporality();
132-
this.userAgentHeader = getUserAgentHeader();
133133
config().namingConvention(NamingConvention.dot);
134134
start(threadFactory);
135135
}
@@ -178,34 +178,15 @@ protected void publish() {
178178
.build())
179179
.build())
180180
.build();
181-
HttpSender.Request.Builder httpRequest = this.httpSender.post(this.config.url())
182-
.withHeader("User-Agent", this.userAgentHeader)
183-
.withContent("application/x-protobuf", request.toByteArray());
184-
this.config.headers().forEach(httpRequest::withHeader);
185-
HttpSender.Response response = httpRequest.send();
186-
if (!response.isSuccessful()) {
187-
logger.warn(
188-
"Failed to publish metrics (context: {}). Server responded with HTTP status code {} and body {}",
189-
getConfigurationContext(), response.code(), response.body());
190-
}
181+
182+
metricsSender.send(request);
191183
}
192184
catch (Throwable e) {
193-
logger.warn(String.format("Failed to publish metrics to OTLP receiver (context: %s)",
194-
getConfigurationContext()), e);
185+
logger.warn("Failed to publish metrics to OTLP receiver", e);
195186
}
196187
}
197188
}
198189

199-
/**
200-
* Get the configuration context.
201-
* @return A message containing enough information for the log reader to figure out
202-
* what configuration details may have contributed to the failure.
203-
*/
204-
private String getConfigurationContext() {
205-
// While other values may contribute to failures, these two are most common
206-
return "url=" + config.url() + ", resource-attributes=" + config.resourceAttributes();
207-
}
208-
209190
@Override
210191
protected <T> Gauge newGauge(Meter.Id id, @Nullable T obj, ToDoubleFunction<T> valueFunction) {
211192
return new DefaultGauge<>(id, obj, valueFunction);
@@ -390,12 +371,12 @@ Iterable<KeyValue> getResourceAttributes() {
390371
}
391372

392373
static Histogram getHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
393-
OtlpConfig otlpConfig) {
374+
OtlpConfig otlpConfig) {
394375
return getHistogram(clock, distributionStatisticConfig, otlpConfig, null);
395376
}
396377

397378
static Histogram getHistogram(final Clock clock, final DistributionStatisticConfig distributionStatisticConfig,
398-
final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) {
379+
final OtlpConfig otlpConfig, @Nullable final TimeUnit baseTimeUnit) {
399380
// While publishing to OTLP, we export either Histogram datapoint (Explicit
400381
// ExponentialBuckets
401382
// or Exponential) / Summary
@@ -411,14 +392,14 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf
411392
}
412393

413394
return otlpConfig.aggregationTemporality() == AggregationTemporality.DELTA
414-
? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
415-
minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis())
416-
: new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
417-
minimumExpectedValue, baseTimeUnit);
395+
? new DeltaBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
396+
minimumExpectedValue, baseTimeUnit, clock, otlpConfig.step().toMillis())
397+
: new CumulativeBase2ExponentialHistogram(otlpConfig.maxScale(), otlpConfig.maxBucketCount(),
398+
minimumExpectedValue, baseTimeUnit);
418399
}
419400

420401
Histogram explicitBucketHistogram = getExplicitBucketHistogram(clock, distributionStatisticConfig,
421-
otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis());
402+
otlpConfig.aggregationTemporality(), otlpConfig.step().toMillis());
422403
if (explicitBucketHistogram != null) {
423404
return explicitBucketHistogram;
424405
}
@@ -431,22 +412,22 @@ static Histogram getHistogram(final Clock clock, final DistributionStatisticConf
431412
}
432413

433414
static HistogramFlavor histogramFlavor(HistogramFlavor preferredHistogramFlavor,
434-
DistributionStatisticConfig distributionStatisticConfig) {
415+
DistributionStatisticConfig distributionStatisticConfig) {
435416

436417
final double[] serviceLevelObjectiveBoundaries = distributionStatisticConfig
437418
.getServiceLevelObjectiveBoundaries();
438419
if (distributionStatisticConfig.isPublishingHistogram()
439-
&& preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
440-
&& (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) {
420+
&& preferredHistogramFlavor == HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM
421+
&& (serviceLevelObjectiveBoundaries == null || serviceLevelObjectiveBoundaries.length == 0)) {
441422
return HistogramFlavor.BASE2_EXPONENTIAL_BUCKET_HISTOGRAM;
442423
}
443424
return HistogramFlavor.EXPLICIT_BUCKET_HISTOGRAM;
444425
}
445426

446427
@Nullable
447428
private static Histogram getExplicitBucketHistogram(final Clock clock,
448-
final DistributionStatisticConfig distributionStatisticConfig,
449-
final AggregationTemporality aggregationTemporality, final long stepMillis) {
429+
final DistributionStatisticConfig distributionStatisticConfig,
430+
final AggregationTemporality aggregationTemporality, final long stepMillis) {
450431

451432
double[] sloWithPositiveInf = getSloWithPositiveInf(distributionStatisticConfig);
452433
if (AggregationTemporality.isCumulative(aggregationTemporality)) {
@@ -461,11 +442,11 @@ private static Histogram getExplicitBucketHistogram(final Clock clock,
461442
}
462443
if (AggregationTemporality.isDelta(aggregationTemporality) && stepMillis > 0) {
463444
return new OtlpStepBucketHistogram(clock, stepMillis,
464-
DistributionStatisticConfig.builder()
465-
.serviceLevelObjectives(sloWithPositiveInf)
466-
.build()
467-
.merge(distributionStatisticConfig),
468-
true, false);
445+
DistributionStatisticConfig.builder()
446+
.serviceLevelObjectives(sloWithPositiveInf)
447+
.build()
448+
.merge(distributionStatisticConfig),
449+
true, false);
469450
}
470451

471452
return null;
@@ -492,11 +473,4 @@ static double[] getSloWithPositiveInf(DistributionStatisticConfig distributionSt
492473
return sloWithPositiveInf;
493474
}
494475

495-
private String getUserAgentHeader() {
496-
if (this.getClass().getPackage().getImplementationVersion() == null) {
497-
return "Micrometer-OTLP-Exporter-Java";
498-
}
499-
return "Micrometer-OTLP-Exporter-Java/" + this.getClass().getPackage().getImplementationVersion();
500-
}
501-
502476
}

implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpMeterRegistryTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,22 +54,23 @@ abstract class OtlpMeterRegistryTest {
5454

5555
protected MockClock clock;
5656

57-
private HttpSender mockHttpSender;
58-
5957
OtlpMeterRegistry registry;
6058

6159
OtlpMeterRegistry registryWithExponentialHistogram;
6260

61+
private OtlpHttpMetricsSender metricsSender;
62+
6363
abstract OtlpConfig otlpConfig();
6464

6565
abstract OtlpConfig exponentialHistogramOtlpConfig();
6666

6767
@BeforeEach
6868
void setUp() {
6969
this.clock = new MockClock();
70-
this.mockHttpSender = mock(HttpSender.class);
71-
this.registry = new OtlpMeterRegistry(otlpConfig(), this.clock,
72-
new NamedThreadFactory("otlp-metrics-publisher"), this.mockHttpSender);
70+
OtlpConfig config = otlpConfig();
71+
this.metricsSender = new OtlpHttpMetricsSender(mock(HttpSender.class), config);
72+
this.registry = new OtlpMeterRegistry(config, this.clock, new NamedThreadFactory("otlp-metrics-publisher"),
73+
metricsSender);
7374
this.registryWithExponentialHistogram = new OtlpMeterRegistry(exponentialHistogramOtlpConfig(), clock);
7475
}
7576

@@ -148,15 +149,16 @@ void timeGauge() {
148149
@Issue("#5577")
149150
@Test
150151
void httpHeaders() throws Throwable {
151-
HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(), this.mockHttpSender);
152-
when(mockHttpSender.post(otlpConfig().url())).thenReturn(builder);
152+
HttpSender.Request.Builder builder = HttpSender.Request.build(otlpConfig().url(),
153+
this.metricsSender.httpSender);
154+
when(metricsSender.httpSender.post(otlpConfig().url())).thenReturn(builder);
153155

154-
when(mockHttpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, ""));
156+
when(metricsSender.httpSender.send(isA(HttpSender.Request.class))).thenReturn(new HttpSender.Response(200, ""));
155157

156158
writeToMetric(TimeGauge.builder("gauge.time", this, TimeUnit.MICROSECONDS, o -> 24).register(registry));
157159
registry.publish();
158160

159-
verify(this.mockHttpSender).send(assertArg(request -> {
161+
verify(this.metricsSender.httpSender).send(assertArg(request -> {
160162
assertThat(request.getRequestHeaders().get("User-Agent")).startsWith("Micrometer-OTLP-Exporter-Java");
161163
assertThat(request.getRequestHeaders()).containsEntry("Content-Type", "application/x-protobuf");
162164
}));

0 commit comments

Comments
 (0)