Skip to content

Commit 06f4f52

Browse files
crossoverJielauritsteverao
authored
Add Pulsar Consumer metrics (#11891)
Co-authored-by: Lauri Tulmin <[email protected]> Co-authored-by: Steve Rao <[email protected]>
1 parent 23a110e commit 06f4f52

File tree

11 files changed

+525
-19
lines changed

11 files changed

+525
-19
lines changed

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesExtractor.java

+4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
3636
AttributeKey.booleanKey("messaging.destination.anonymous");
3737
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
3838
AttributeKey.stringKey("messaging.destination.name");
39+
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
40+
AttributeKey.stringKey("messaging.destination.partition.id");
3941
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
4042
AttributeKey.stringKey("messaging.destination.template");
4143
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_TEMPORARY =
@@ -98,6 +100,8 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
98100
internalSet(
99101
attributes, MESSAGING_DESTINATION_TEMPLATE, getter.getDestinationTemplate(request));
100102
}
103+
internalSet(
104+
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));
101105
boolean isAnonymousDestination = getter.isAnonymousDestination(request);
102106
if (isAnonymousDestination) {
103107
internalSet(attributes, MESSAGING_DESTINATION_ANONYMOUS, true);

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingAttributesGetter.java

+5
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ default Long getMessagePayloadCompressedSize(REQUEST request) {
6262
@Nullable
6363
Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response);
6464

65+
@Nullable
66+
default String getDestinationPartitionId(REQUEST request) {
67+
return null;
68+
}
69+
6570
/**
6671
* Extracts all values of header named {@code name} from the request, or an empty list if there
6772
* were none.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;
7+
8+
import static java.util.logging.Level.FINE;
9+
10+
import com.google.auto.value.AutoValue;
11+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
12+
import io.opentelemetry.api.common.AttributeKey;
13+
import io.opentelemetry.api.common.Attributes;
14+
import io.opentelemetry.api.metrics.DoubleHistogram;
15+
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
16+
import io.opentelemetry.api.metrics.LongCounter;
17+
import io.opentelemetry.api.metrics.LongCounterBuilder;
18+
import io.opentelemetry.api.metrics.Meter;
19+
import io.opentelemetry.context.Context;
20+
import io.opentelemetry.context.ContextKey;
21+
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
22+
import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics;
23+
import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.logging.Logger;
26+
27+
/**
28+
* {@link OperationListener} which keeps track of <a
29+
* href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#consumer-metrics">Consumer
30+
* metrics</a>.
31+
*/
32+
public final class MessagingConsumerMetrics implements OperationListener {
33+
private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1);
34+
35+
// copied from MessagingIncubatingAttributes
36+
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
37+
AttributeKey.longKey("messaging.batch.message_count");
38+
private static final ContextKey<MessagingConsumerMetrics.State> MESSAGING_CONSUMER_METRICS_STATE =
39+
ContextKey.named("messaging-consumer-metrics-state");
40+
private static final Logger logger = Logger.getLogger(MessagingConsumerMetrics.class.getName());
41+
42+
private final DoubleHistogram receiveDurationHistogram;
43+
private final LongCounter receiveMessageCount;
44+
45+
private MessagingConsumerMetrics(Meter meter) {
46+
DoubleHistogramBuilder durationBuilder =
47+
meter
48+
.histogramBuilder("messaging.receive.duration")
49+
.setDescription("Measures the duration of receive operation.")
50+
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
51+
.setUnit("s");
52+
MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder);
53+
receiveDurationHistogram = durationBuilder.build();
54+
55+
LongCounterBuilder longCounterBuilder =
56+
meter
57+
.counterBuilder("messaging.receive.messages")
58+
.setDescription("Measures the number of received messages.")
59+
.setUnit("{message}");
60+
MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder);
61+
receiveMessageCount = longCounterBuilder.build();
62+
}
63+
64+
public static OperationMetrics get() {
65+
return OperationMetricsUtil.create("messaging consumer", MessagingConsumerMetrics::new);
66+
}
67+
68+
@Override
69+
@CanIgnoreReturnValue
70+
public Context onStart(Context context, Attributes startAttributes, long startNanos) {
71+
return context.with(
72+
MESSAGING_CONSUMER_METRICS_STATE,
73+
new AutoValue_MessagingConsumerMetrics_State(startAttributes, startNanos));
74+
}
75+
76+
@Override
77+
public void onEnd(Context context, Attributes endAttributes, long endNanos) {
78+
MessagingConsumerMetrics.State state = context.get(MESSAGING_CONSUMER_METRICS_STATE);
79+
if (state == null) {
80+
logger.log(
81+
FINE,
82+
"No state present when ending context {0}. Cannot record consumer receive metrics.",
83+
context);
84+
return;
85+
}
86+
87+
Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();
88+
receiveDurationHistogram.record(
89+
(endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context);
90+
91+
long receiveMessagesCount = getReceiveMessagesCount(state.startAttributes(), endAttributes);
92+
receiveMessageCount.add(receiveMessagesCount, attributes, context);
93+
}
94+
95+
private static long getReceiveMessagesCount(Attributes... attributesList) {
96+
for (Attributes attributes : attributesList) {
97+
Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT);
98+
if (value != null) {
99+
return value;
100+
}
101+
}
102+
return 1;
103+
}
104+
105+
@AutoValue
106+
abstract static class State {
107+
108+
abstract Attributes startAttributes();
109+
110+
abstract long startTimeNanos();
111+
}
112+
}

instrumentation-api-incubator/src/main/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingMetricsAdvice.java

+32-12
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@
1010

1111
import io.opentelemetry.api.common.AttributeKey;
1212
import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder;
13+
import io.opentelemetry.api.incubator.metrics.ExtendedLongCounterBuilder;
1314
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
15+
import io.opentelemetry.api.metrics.LongCounterBuilder;
1416
import io.opentelemetry.semconv.ErrorAttributes;
1517
import io.opentelemetry.semconv.ServerAttributes;
1618
import java.util.List;
@@ -27,23 +29,41 @@ final class MessagingMetricsAdvice {
2729
AttributeKey.stringKey("messaging.destination.name");
2830
private static final AttributeKey<String> MESSAGING_OPERATION =
2931
AttributeKey.stringKey("messaging.operation");
30-
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
31-
AttributeKey.longKey("messaging.batch.message_count");
32+
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
33+
AttributeKey.stringKey("messaging.destination.partition.id");
34+
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
35+
AttributeKey.stringKey("messaging.destination.template");
36+
37+
private static final List<AttributeKey<?>> MESSAGING_ATTRIBUTES =
38+
asList(
39+
MESSAGING_SYSTEM,
40+
MESSAGING_DESTINATION_NAME,
41+
MESSAGING_OPERATION,
42+
MESSAGING_DESTINATION_PARTITION_ID,
43+
MESSAGING_DESTINATION_TEMPLATE,
44+
ErrorAttributes.ERROR_TYPE,
45+
ServerAttributes.SERVER_PORT,
46+
ServerAttributes.SERVER_ADDRESS);
3247

3348
static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) {
3449
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
3550
return;
3651
}
37-
((ExtendedDoubleHistogramBuilder) builder)
38-
.setAttributesAdvice(
39-
asList(
40-
MESSAGING_SYSTEM,
41-
MESSAGING_DESTINATION_NAME,
42-
MESSAGING_OPERATION,
43-
MESSAGING_BATCH_MESSAGE_COUNT,
44-
ErrorAttributes.ERROR_TYPE,
45-
ServerAttributes.SERVER_PORT,
46-
ServerAttributes.SERVER_ADDRESS));
52+
((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
53+
}
54+
55+
static void applyReceiveDurationAdvice(DoubleHistogramBuilder builder) {
56+
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
57+
return;
58+
}
59+
((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
60+
}
61+
62+
static void applyReceiveMessagesAdvice(LongCounterBuilder builder) {
63+
if (!(builder instanceof ExtendedLongCounterBuilder)) {
64+
return;
65+
}
66+
((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
4767
}
4868

4969
private MessagingMetricsAdvice() {}

instrumentation-api-incubator/src/test/java/io/opentelemetry/instrumentation/api/incubator/semconv/messaging/MessagingProducerMetricsTest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ void collectsMetrics() {
5050
Attributes responseAttributes =
5151
Attributes.builder()
5252
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0")
53-
.put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2)
53+
.put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, "1")
5454
.build();
5555

5656
Context parent =
@@ -90,6 +90,10 @@ void collectsMetrics() {
9090
equalTo(
9191
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
9292
"pulsar"),
93+
equalTo(
94+
MessagingIncubatingAttributes
95+
.MESSAGING_DESTINATION_PARTITION_ID,
96+
"1"),
9397
equalTo(
9498
MessagingIncubatingAttributes
9599
.MESSAGING_DESTINATION_NAME,

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchMessagingAttributesGetter.java

+11
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import java.util.stream.Collectors;
1212
import java.util.stream.StreamSupport;
1313
import javax.annotation.Nullable;
14+
import org.apache.pulsar.common.naming.TopicName;
1415

1516
enum PulsarBatchMessagingAttributesGetter
1617
implements MessagingAttributesGetter<PulsarBatchRequest, Void> {
@@ -81,6 +82,16 @@ public Long getBatchMessageCount(PulsarBatchRequest request, @Nullable Void unus
8182
return (long) request.getMessages().size();
8283
}
8384

85+
@Nullable
86+
@Override
87+
public String getDestinationPartitionId(PulsarBatchRequest request) {
88+
int partitionIndex = TopicName.getPartitionIndex(request.getDestination());
89+
if (partitionIndex == -1) {
90+
return null;
91+
}
92+
return String.valueOf(partitionIndex);
93+
}
94+
8495
@Override
8596
public List<String> getMessageHeader(PulsarBatchRequest request, String name) {
8697
return StreamSupport.stream(request.getMessages().spliterator(), false)

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarBatchRequest.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
1111
import org.apache.pulsar.client.api.Message;
1212
import org.apache.pulsar.client.api.Messages;
13+
import org.apache.pulsar.common.naming.TopicName;
1314

1415
public final class PulsarBatchRequest extends BasePulsarRequest {
1516
private final Messages<?> messages;
@@ -30,7 +31,10 @@ private static String getTopicName(Messages<?> messages) {
3031
if (topicName == null) {
3132
topicName = name;
3233
} else if (!topicName.equals(name)) {
33-
return null;
34+
// this is a partitioned topic
35+
// persistent://public/default/test-partition-0 persistent://public/default/test-partition-1
36+
// return persistent://public/default/test
37+
return TopicName.get(topicName).getPartitionedTopicName();
3438
}
3539
}
3640
return topicName;

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarMessagingAttributesGetter.java

+11
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.List;
1313
import javax.annotation.Nullable;
1414
import org.apache.pulsar.client.api.Message;
15+
import org.apache.pulsar.common.naming.TopicName;
1516

1617
enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter<PulsarRequest, Void> {
1718
INSTANCE;
@@ -83,6 +84,16 @@ public Long getBatchMessageCount(PulsarRequest request, @Nullable Void unused) {
8384
return null;
8485
}
8586

87+
@Nullable
88+
@Override
89+
public String getDestinationPartitionId(PulsarRequest request) {
90+
int partitionIndex = TopicName.getPartitionIndex(request.getDestination());
91+
if (partitionIndex == -1) {
92+
return null;
93+
}
94+
return String.valueOf(partitionIndex);
95+
}
96+
8697
@Override
8798
public List<String> getMessageHeader(PulsarRequest request, String name) {
8899
String value = request.getMessage().getProperty(name);

instrumentation/pulsar/pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
1414
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
1515
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
16+
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingConsumerMetrics;
1617
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics;
1718
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
1819
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
@@ -76,6 +77,7 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
7677
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
7778
.addAttributesExtractor(
7879
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
80+
.addOperationMetrics(MessagingConsumerMetrics.get())
7981
.addAttributesExtractor(
8082
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
8183

@@ -101,6 +103,7 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
101103
.addAttributesExtractor(
102104
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
103105
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
106+
.addOperationMetrics(MessagingConsumerMetrics.get())
104107
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
105108
}
106109

@@ -189,7 +192,7 @@ private static Context startAndEndConsumerReceive(
189192
Timer timer,
190193
Consumer<?> consumer,
191194
Throwable throwable) {
192-
if (messages == null) {
195+
if (messages == null || messages.size() == 0) {
193196
return null;
194197
}
195198
String brokerUrl = VirtualFieldStore.extract(consumer);

0 commit comments

Comments
 (0)