Skip to content

Commit e151986

Browse files
Add Pulsar collector (#3788)
resolves #2297 1. Add pulsar collector 2. Add tests for all changes 3. Add zipkin pulsar docker related files 4. Add corresponding usage documents
1 parent 4254a48 commit e151986

File tree

24 files changed

+1266
-2
lines changed

24 files changed

+1266
-2
lines changed

.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
!zipkin-collector/kafka/src/main/**
5656
!zipkin-collector/rabbitmq/src/main/**
5757
!zipkin-collector/scribe/src/main/**
58+
!zipkin-collector/pulsar/src/main/**
5859
!zipkin-junit5/src/main/**
5960
!zipkin-storage/src/main/**
6061
!zipkin-storage/cassandra/src/main/**

.github/workflows/test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ jobs:
6969
- name: zipkin-collector-activemq
7070
- name: zipkin-collector-kafka
7171
- name: zipkin-collector-rabbitmq
72+
- name: zipkin-collector-pulsar
7273
- name: zipkin-storage-cassandra
7374
- name: zipkin-storage-elasticsearch
7475
- name: zipkin-storage-mysql-v1

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ aggregate behavior including error paths or calls to deprecated services.
2424
Application’s need to be “instrumented” to report trace data to Zipkin. This
2525
usually means configuration of a [tracer or instrumentation library](https://zipkin.io/pages/tracers_instrumentation.html). The most
2626
popular ways to report data to Zipkin are via http or Kafka, though many other
27-
options exist, such as Apache ActiveMQ, gRPC and RabbitMQ. The data served to
27+
options exist, such as Apache ActiveMQ, gRPC, RabbitMQ and Apache Pulsar. The data served to
2828
the UI is stored in-memory, or persistently with a supported backend such as
2929
Apache Cassandra or Elasticsearch.
3030

docker/examples/README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,18 @@ $ docker compose -f docker-compose-rabbitmq.yml up
118118
Then configure the [RabbitMQ sender](https://github.com/openzipkin/zipkin-reporter-java/blob/master/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java)
119119
using a `host` value of `localhost` or a non-local hostname if in docker.
120120

121+
122+
## Pulsar
123+
124+
You can collect traces from [Pulsar](../test-images/zipkin-pulsar/README.md) in addition to HTTP, using the
125+
`docker-compose-pulsar.yml` file. This configuration starts `zipkin` and `zipkin-pulsar` in their
126+
own containers.
127+
128+
To add Pulsar configuration, run:
129+
```bash
130+
$ docker compose -f docker-compose-pulsar.yml up
131+
```
132+
121133
## Eureka
122134

123135
You can register Zipkin for service discovery in [Eureka](../test-images/zipkin-eureka/README.md)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#
2+
# Copyright The OpenZipkin Authors
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
6+
# This file uses the version 2 docker compose file format, described here:
7+
# https://docs.docker.com/compose/compose-file/#version-2
8+
#
9+
# It extends the default configuration from docker-compose.yml to add a test
10+
# pulsar server, which is used as a span transport.
11+
12+
version: '2.4'
13+
14+
services:
15+
pulsar:
16+
image: ghcr.io/openzipkin/zipkin-pulsar:${TAG:-latest}
17+
container_name: pulsar
18+
ports: # expose the pulsar port so apps can publish spans.
19+
- "6650:6650"
20+
# - "8080:8080" # uncomment to expose the pulsar http port.
21+
22+
zipkin:
23+
extends:
24+
file: docker-compose.yml
25+
service: zipkin
26+
# slim doesn't include Pulsar support, so switch to the larger image
27+
image: ghcr.io/openzipkin/zipkin:${TAG:-latest}
28+
environment:
29+
- PULSAR_SERVICE_URL=pulsar://pulsar:6650
30+
depends_on:
31+
pulsar:
32+
condition: service_healthy

zipkin-collector/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<module>kafka</module>
3030
<module>rabbitmq</module>
3131
<module>scribe</module>
32+
<module>pulsar</module>
3233
</modules>
3334

3435
<dependencies>

zipkin-collector/pulsar/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# collector-pulsar
2+
3+
## PulsarCollector
4+
5+
This collector is implemented as a Pulsar consumer supporting Pulsar brokers running
6+
version 4.x or later, and the default subscription type is `Shared`, in Shared subscription type,
7+
multiple consumers can attach to the same subscription and messages are delivered
8+
in a round-robin distribution across consumers.
9+
10+
This collector is implemented as a Pulsar consumer supporting Pulsar brokers running version 4.x or later.
11+
The default `subscriptionType` is `Shared`, which allows multiple consumers to attach to the same subscription,
12+
with messages delivered in a round-robin distribution across consumers, the default `subscriptionInitialPosition`
13+
is `Earliest`, you can modify the consumer settings as needed through the `consumerProps` parameter.
14+
Also, the client settings can also be modified through the `clientProps` parameter.
15+
16+
For information about running this collector as a module in Zipkin server, see
17+
the [Zipkin Server README](../../zipkin-server/README.md#pulsar-collector).
18+
19+
When using this collector as a library outside of Zipkin server,
20+
[zipkin2.collector.pulsar.PulsarCollector.Builder](src/main/java/zipkin2/collector/pulsar/PulsarCollector.java)
21+
includes defaults that will operate against a Pulsar topic name `zipkin`.
22+
23+
## Encoding spans into Pulsar messages
24+
25+
The message's binary data includes a list of spans. Supported encodings
26+
are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body.
27+
28+
### Json
29+
30+
The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).
31+
32+
`Codec.JSON.writeSpans(spans)` performs the correct json encoding.
33+
34+
### Thrift
35+
36+
The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
37+
38+
`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:
39+
40+
```
41+
write_byte(12) // type of the list elements: 12 == struct
42+
write_i32(count) // count of spans that will follow
43+
for (int i = 0; i < count; i++) {
44+
writeTBinaryProtocol(spans(i))
45+
}
46+
```
47+
48+
### Legacy encoding
49+
50+
Older versions of zipkin accepted a single span per message, as opposed
51+
to a list per message. This practice is deprecated, but still supported.
52+
53+
## Logging
54+
55+
Zipkin by default suppresses all logging output from Pulsar client operations as they can get quite verbose. Start
56+
Zipkin
57+
with `--logging.level.org.apache.pulsar=INFO` or similar to override this during troubleshooting for example.

zipkin-collector/pulsar/pom.xml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Copyright The OpenZipkin Authors
5+
SPDX-License-Identifier: Apache-2.0
6+
7+
-->
8+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
9+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<parent>
13+
<groupId>io.zipkin.zipkin2</groupId>
14+
<artifactId>zipkin-collector-parent</artifactId>
15+
<version>3.4.5-SNAPSHOT</version>
16+
</parent>
17+
18+
<artifactId>zipkin-collector-pulsar</artifactId>
19+
<name>Collector: Pulsar</name>
20+
21+
<properties>
22+
<main.basedir>${project.basedir}/../..</main.basedir>
23+
<pulsar-client.version>4.0.2</pulsar-client.version>
24+
</properties>
25+
26+
<dependencies>
27+
<dependency>
28+
<groupId>${project.groupId}</groupId>
29+
<artifactId>zipkin-collector</artifactId>
30+
<version>${project.version}</version>
31+
</dependency>
32+
33+
<dependency>
34+
<groupId>org.apache.pulsar</groupId>
35+
<artifactId>pulsar-client</artifactId>
36+
<version>${pulsar-client.version}</version>
37+
</dependency>
38+
</dependencies>
39+
</project>
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright The OpenZipkin Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package zipkin2.collector.pulsar;
6+
7+
import org.apache.pulsar.client.api.PulsarClient;
8+
import org.apache.pulsar.client.api.PulsarClientException;
9+
import zipkin2.CheckResult;
10+
import zipkin2.collector.Collector;
11+
import zipkin2.collector.CollectorMetrics;
12+
13+
import java.util.Map;
14+
import java.util.concurrent.atomic.AtomicReference;
15+
16+
class LazyPulsarInit {
17+
18+
private final Collector collector;
19+
private final CollectorMetrics metrics;
20+
private final String topic;
21+
private final int concurrency;
22+
private final Map<String, Object> clientProps, consumerProps;
23+
public volatile PulsarClient result;
24+
final AtomicReference<CheckResult> failure = new AtomicReference<>();
25+
26+
LazyPulsarInit(PulsarCollector.Builder builder) {
27+
this.collector = builder.delegate.build();
28+
this.metrics = builder.metrics;
29+
this.topic = builder.topic;
30+
this.concurrency = builder.concurrency;
31+
this.clientProps = builder.clientProps;
32+
this.consumerProps = builder.consumerProps;
33+
}
34+
35+
void init() {
36+
if (result == null) {
37+
synchronized (this) {
38+
if (result == null) {
39+
result = subscribe();
40+
}
41+
}
42+
}
43+
}
44+
45+
private PulsarClient subscribe() {
46+
PulsarClient client;
47+
try {
48+
client = PulsarClient.builder()
49+
.loadConf(clientProps)
50+
.build();
51+
} catch (Exception e) {
52+
failure.set(CheckResult.failed(e));
53+
throw new RuntimeException("Pulsar client creation failed. " + e.getMessage(), e);
54+
}
55+
56+
try {
57+
for (int i = 0; i < concurrency; i++) {
58+
PulsarSpanConsumer consumer = new PulsarSpanConsumer(topic, consumerProps, client, collector, metrics);
59+
consumer.startConsumer();
60+
}
61+
return client;
62+
} catch (Exception e) {
63+
try {
64+
client.close();
65+
} catch (PulsarClientException ex) {
66+
// Nobody cares me.
67+
}
68+
failure.set(CheckResult.failed(e));
69+
throw new RuntimeException("Pulsar Client is unable to subscribe to the topic(" + topic + "), please check the service.", e);
70+
}
71+
}
72+
73+
void close() throws PulsarClientException {
74+
PulsarClient maybe = result;
75+
if (maybe != null) {
76+
result.close();
77+
result = null;
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)