Skip to content

Add Pulsar Consumer metrics #11891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b954474
support pulsar messaging.publish.duration semantic
crossoverJie Jun 14, 2024
d855f97
code style
crossoverJie Jun 14, 2024
b5d512b
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
3de71a8
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
6bb2c90
Update instrumentation-api-incubator/src/test/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
6f68ebd
Update instrumentation-api-incubator/src/test/java/io/opentelemetry/i…
crossoverJie Jun 17, 2024
9f42598
fix with cr
crossoverJie Jun 17, 2024
8b5e585
Add pulsar metrics test
crossoverJie Jun 18, 2024
53b2dee
Update doc url
crossoverJie Jun 18, 2024
53b4471
pulsar-receive-duration
crossoverJie Jun 18, 2024
df187f4
use array
crossoverJie Jun 18, 2024
e2d5e09
use array
crossoverJie Jun 18, 2024
8651722
Merge branch 'main' into pulsar-receive-duration
crossoverJie Jun 19, 2024
52405fd
pulsar-receive-duration
crossoverJie Jun 23, 2024
8bfc6fd
support messaging.receive.messages
crossoverJie Jun 24, 2024
61d402d
Merge remote-tracking branch 'otel-origin/main' into pulsar-receive-d…
crossoverJie Jul 24, 2024
fde08c4
support messaging.receive.messages
crossoverJie Jul 24, 2024
7991bef
Add a test for Partition Consumer
crossoverJie Jul 25, 2024
3ca21a9
fix with cr
crossoverJie Aug 2, 2024
e889e16
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
2c7b92e
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
9710b0d
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
84c9fbe
Use RecordMetadata instead of void for kafka
crossoverJie Aug 2, 2024
a0d0cd8
check recordMetadata is null
crossoverJie Aug 2, 2024
6780c4d
revert about kafka
crossoverJie Aug 2, 2024
dfc799c
Merge remote-tracking branch 'origin/pulsar-receive-duration' into pu…
crossoverJie Aug 2, 2024
fe04048
revert about kafka
crossoverJie Aug 2, 2024
c849a3b
revert about kafka
crossoverJie Aug 2, 2024
8573d0e
revert about kafka
crossoverJie Aug 2, 2024
8bb50f5
revert about kafka
crossoverJie Aug 2, 2024
99c8256
revert about kafka
crossoverJie Aug 2, 2024
807d60b
fix with cr
crossoverJie Aug 7, 2024
06e8b8e
Update instrumentation-api-incubator/src/main/java/io/opentelemetry/i…
laurit Aug 9, 2024
76b103d
fix with cr
crossoverJie Aug 11, 2024
cd83eed
fix with cr
crossoverJie Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
AttributeKey.booleanKey("messaging.destination.anonymous");
private static final AttributeKey<String> MESSAGING_DESTINATION_NAME =
AttributeKey.stringKey("messaging.destination.name");
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
AttributeKey.stringKey("messaging.destination.partition.id");
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
AttributeKey.stringKey("messaging.destination.template");
private static final AttributeKey<Boolean> MESSAGING_DESTINATION_TEMPORARY =
Expand Down Expand Up @@ -98,6 +100,8 @@ public void onStart(AttributesBuilder attributes, Context parentContext, REQUEST
internalSet(
attributes, MESSAGING_DESTINATION_TEMPLATE, getter.getDestinationTemplate(request));
}
internalSet(
attributes, MESSAGING_DESTINATION_PARTITION_ID, getter.getDestinationPartitionId(request));
boolean isAnonymousDestination = getter.isAnonymousDestination(request);
if (isAnonymousDestination) {
internalSet(attributes, MESSAGING_DESTINATION_ANONYMOUS, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ default Long getMessagePayloadCompressedSize(REQUEST request) {
@Nullable
Long getBatchMessageCount(REQUEST request, @Nullable RESPONSE response);

@Nullable
default String getDestinationPartitionId(REQUEST request) {
return null;
}

/**
* Extracts all values of header named {@code name} from the request, or an empty list if there
* were none.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.instrumentation.api.incubator.semconv.messaging;

import static java.util.logging.Level.FINE;

import com.google.auto.value.AutoValue;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.instrumentation.api.instrumenter.OperationListener;
import io.opentelemetry.instrumentation.api.instrumenter.OperationMetrics;
import io.opentelemetry.instrumentation.api.internal.OperationMetricsUtil;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
* {@link OperationListener} which keeps track of <a
* href="https://github.com/open-telemetry/semantic-conventions/blob/v1.26.0/docs/messaging/messaging-metrics.md#consumer-metrics">Consumer
* metrics</a>.
*/
public final class MessagingConsumerMetrics implements OperationListener {
private static final double NANOS_PER_S = TimeUnit.SECONDS.toNanos(1);

// copied from MessagingIncubatingAttributes
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");
private static final ContextKey<MessagingConsumerMetrics.State> MESSAGING_CONSUMER_METRICS_STATE =
ContextKey.named("messaging-consumer-metrics-state");
private static final Logger logger = Logger.getLogger(MessagingConsumerMetrics.class.getName());

private final DoubleHistogram receiveDurationHistogram;
private final LongCounter receiveMessageCount;

private MessagingConsumerMetrics(Meter meter) {
DoubleHistogramBuilder durationBuilder =
meter
.histogramBuilder("messaging.receive.duration")
.setDescription("Measures the duration of receive operation.")
.setExplicitBucketBoundariesAdvice(MessagingMetricsAdvice.DURATION_SECONDS_BUCKETS)
.setUnit("s");
MessagingMetricsAdvice.applyReceiveDurationAdvice(durationBuilder);
receiveDurationHistogram = durationBuilder.build();

LongCounterBuilder longCounterBuilder =
meter
.counterBuilder("messaging.receive.messages")
.setDescription("Measures the number of received messages.")
.setUnit("{message}");
Comment on lines +46 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we go ahead and update to the latest spec to avoid additional churn? https://github.com/open-telemetry/semantic-conventions/blob/main/docs/messaging/messaging-metrics.md

cc @open-telemetry/semconv-messaging-approvers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semConvVersion hasn't been upgraded to the latest version yet. Is it okay to only upgrade some semantics?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it's ok, if we don't have the new constants available yet, you can create static final constants in this class and reference those

MessagingMetricsAdvice.applyReceiveMessagesAdvice(longCounterBuilder);
receiveMessageCount = longCounterBuilder.build();
}

public static OperationMetrics get() {
return OperationMetricsUtil.create("messaging consumer", MessagingConsumerMetrics::new);
}

@Override
@CanIgnoreReturnValue
public Context onStart(Context context, Attributes startAttributes, long startNanos) {
return context.with(
MESSAGING_CONSUMER_METRICS_STATE,
new AutoValue_MessagingConsumerMetrics_State(startAttributes, startNanos));
}

@Override
public void onEnd(Context context, Attributes endAttributes, long endNanos) {
MessagingConsumerMetrics.State state = context.get(MESSAGING_CONSUMER_METRICS_STATE);
if (state == null) {
logger.log(
FINE,
"No state present when ending context {0}. Cannot record consumer receive metrics.",
context);
return;
}

Attributes attributes = state.startAttributes().toBuilder().putAll(endAttributes).build();
receiveDurationHistogram.record(
(endNanos - state.startTimeNanos()) / NANOS_PER_S, attributes, context);

long receiveMessagesCount = getReceiveMessagesCount(state.startAttributes(), endAttributes);
receiveMessageCount.add(receiveMessagesCount, attributes, context);
}

private static long getReceiveMessagesCount(Attributes... attributesList) {
for (Attributes attributes : attributesList) {
Long value = attributes.get(MESSAGING_BATCH_MESSAGE_COUNT);
if (value != null) {
return value;
}
}
return 1;
}

@AutoValue
abstract static class State {

abstract Attributes startAttributes();

abstract long startTimeNanos();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.incubator.metrics.ExtendedDoubleHistogramBuilder;
import io.opentelemetry.api.incubator.metrics.ExtendedLongCounterBuilder;
import io.opentelemetry.api.metrics.DoubleHistogramBuilder;
import io.opentelemetry.api.metrics.LongCounterBuilder;
import io.opentelemetry.semconv.ErrorAttributes;
import io.opentelemetry.semconv.ServerAttributes;
import java.util.List;
Expand All @@ -27,23 +29,41 @@ final class MessagingMetricsAdvice {
AttributeKey.stringKey("messaging.destination.name");
private static final AttributeKey<String> MESSAGING_OPERATION =
AttributeKey.stringKey("messaging.operation");
private static final AttributeKey<Long> MESSAGING_BATCH_MESSAGE_COUNT =
AttributeKey.longKey("messaging.batch.message_count");
private static final AttributeKey<String> MESSAGING_DESTINATION_PARTITION_ID =
AttributeKey.stringKey("messaging.destination.partition.id");
private static final AttributeKey<String> MESSAGING_DESTINATION_TEMPLATE =
AttributeKey.stringKey("messaging.destination.template");

private static final List<AttributeKey<?>> MESSAGING_ATTRIBUTES =
asList(
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
MESSAGING_DESTINATION_PARTITION_ID,
MESSAGING_DESTINATION_TEMPLATE,
ErrorAttributes.ERROR_TYPE,
ServerAttributes.SERVER_PORT,
ServerAttributes.SERVER_ADDRESS);

static void applyPublishDurationAdvice(DoubleHistogramBuilder builder) {
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
return;
}
((ExtendedDoubleHistogramBuilder) builder)
.setAttributesAdvice(
asList(
MESSAGING_SYSTEM,
MESSAGING_DESTINATION_NAME,
MESSAGING_OPERATION,
MESSAGING_BATCH_MESSAGE_COUNT,
ErrorAttributes.ERROR_TYPE,
ServerAttributes.SERVER_PORT,
ServerAttributes.SERVER_ADDRESS));
((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
}

static void applyReceiveDurationAdvice(DoubleHistogramBuilder builder) {
if (!(builder instanceof ExtendedDoubleHistogramBuilder)) {
return;
}
((ExtendedDoubleHistogramBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
}

static void applyReceiveMessagesAdvice(LongCounterBuilder builder) {
if (!(builder instanceof ExtendedLongCounterBuilder)) {
return;
}
((ExtendedLongCounterBuilder) builder).setAttributesAdvice(MESSAGING_ATTRIBUTES);
}

private MessagingMetricsAdvice() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void collectsMetrics() {
Attributes responseAttributes =
Attributes.builder()
.put(MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, "1:1:0:0")
.put(MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 2)
.put(MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, "1")
.build();

Context parent =
Expand Down Expand Up @@ -90,6 +90,10 @@ void collectsMetrics() {
equalTo(
MessagingIncubatingAttributes.MESSAGING_SYSTEM,
"pulsar"),
equalTo(
MessagingIncubatingAttributes
.MESSAGING_DESTINATION_PARTITION_ID,
"1"),
equalTo(
MessagingIncubatingAttributes
.MESSAGING_DESTINATION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.pulsar.common.naming.TopicName;

enum PulsarBatchMessagingAttributesGetter
implements MessagingAttributesGetter<PulsarBatchRequest, Void> {
Expand Down Expand Up @@ -81,6 +82,16 @@ public Long getBatchMessageCount(PulsarBatchRequest request, @Nullable Void unus
return (long) request.getMessages().size();
}

@Nullable
@Override
public String getDestinationPartitionId(PulsarBatchRequest request) {
int partitionIndex = TopicName.getPartitionIndex(request.getDestination());
if (partitionIndex == -1) {
return null;
}
return String.valueOf(partitionIndex);
}

@Override
public List<String> getMessageHeader(PulsarBatchRequest request, String name) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.common.naming.TopicName;

public final class PulsarBatchRequest extends BasePulsarRequest {
private final Messages<?> messages;
Expand All @@ -30,7 +31,10 @@ private static String getTopicName(Messages<?> messages) {
if (topicName == null) {
topicName = name;
} else if (!topicName.equals(name)) {
return null;
// this is a partitioned topic
// persistent://public/default/test-partition-0 persistent://public/default/test-partition-1
// return persistent://public/default/test
return TopicName.get(topicName).getPartitionedTopicName();
}
}
return topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.naming.TopicName;

enum PulsarMessagingAttributesGetter implements MessagingAttributesGetter<PulsarRequest, Void> {
INSTANCE;
Expand Down Expand Up @@ -83,6 +84,16 @@ public Long getBatchMessageCount(PulsarRequest request, @Nullable Void unused) {
return null;
}

@Nullable
@Override
public String getDestinationPartitionId(PulsarRequest request) {
int partitionIndex = TopicName.getPartitionIndex(request.getDestination());
if (partitionIndex == -1) {
return null;
}
return String.valueOf(partitionIndex);
}

@Override
public List<String> getMessageHeader(PulsarRequest request, String name) {
String value = request.getMessage().getProperty(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingConsumerMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingProducerMetrics;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
Expand Down Expand Up @@ -76,6 +77,7 @@ private static Instrumenter<PulsarRequest, Void> createConsumerReceiveInstrument
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addOperationMetrics(MessagingConsumerMetrics.get())
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()));

Expand All @@ -101,6 +103,7 @@ private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceive
.addAttributesExtractor(
ServerAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.addSpanLinksExtractor(new PulsarBatchRequestSpanLinksExtractor(PROPAGATOR))
.addOperationMetrics(MessagingConsumerMetrics.get())
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}

Expand Down Expand Up @@ -189,7 +192,7 @@ private static Context startAndEndConsumerReceive(
Timer timer,
Consumer<?> consumer,
Throwable throwable) {
if (messages == null) {
if (messages == null || messages.size() == 0) {
return null;
}
String brokerUrl = VirtualFieldStore.extract(consumer);
Expand Down
Loading
Loading