Skip to content

Commit 9843cee

Browse files
author
Adrian Cole
committed
Accepts Zipkin v2 Span format in all current transports
This accepts the json format from #1499 on current transports. It does so by generalizing format detection from the two Kafka libraries, and a new `SpanDecoder` interface. Types are still internal, but this allows us to proceed with other work in #1644, including implementing reporters in any language. Concretely, you can send a json list of span2 format as a Kafka or Http message. If using http, use the /api/v2/spans endpoint like so: ```bash $ curl -X POST -s localhost:9411/api/v2/spans -H'Content-Type: application/json' -d'[{ "timestamp_millis": 1502101460678, "traceId": "9032b04972e475c5", "id": "9032b04972e475c5", "kind": "SERVER", "name": "get", "timestamp": 1502101460678880, "duration": 612898, "localEndpoint": { "serviceName": "brave-webmvc-example", "ipv4": "192.168.1.113" }, "remoteEndpoint": { "serviceName": "", "ipv4": "127.0.0.1", "port": 60149 }, "tags": { "error": "500 Internal Server Error", "http.path": "/a" } }]' ```
1 parent e41f89f commit 9843cee

File tree

14 files changed

+392
-107
lines changed

14 files changed

+392
-107
lines changed

zipkin-collector/kafka/src/main/java/zipkin/collector/kafka/KafkaStreamProcessor.java

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2015-2016 The OpenZipkin Authors
2+
* Copyright 2015-2017 The OpenZipkin Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
55
* in compliance with the License. You may obtain a copy of the License at
@@ -13,10 +13,8 @@
1313
*/
1414
package zipkin.collector.kafka;
1515

16-
import java.util.Collections;
1716
import kafka.consumer.ConsumerIterator;
1817
import kafka.consumer.KafkaStream;
19-
import zipkin.Codec;
2018
import zipkin.collector.Collector;
2119
import zipkin.collector.CollectorMetrics;
2220

@@ -47,24 +45,7 @@ public void run() {
4745
continue;
4846
}
4947

50-
// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
51-
// .. If the first byte isn't in that range, it isn't a thrift.
52-
//
53-
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
54-
//
55-
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
56-
// .. When serializing a Span (Struct), the first byte will be the type of a field
57-
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
58-
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
59-
if (bytes[0] == '[') {
60-
collector.acceptSpans(bytes, Codec.JSON, NOOP);
61-
} else {
62-
if (bytes[0] == 12 /* TType.STRUCT */) {
63-
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
64-
} else {
65-
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
66-
}
67-
}
48+
collector.acceptSpans(bytes, NOOP);
6849
}
6950
}
7051
}

zipkin-collector/kafka/src/test/java/zipkin/collector/kafka/KafkaCollectorTest.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2015-2016 The OpenZipkin Authors
2+
* Copyright 2015-2017 The OpenZipkin Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
55
* in compliance with the License. You may obtain a copy of the License at
@@ -13,6 +13,8 @@
1313
*/
1414
package zipkin.collector.kafka;
1515

16+
import java.util.ArrayList;
17+
import java.util.Arrays;
1618
import java.util.List;
1719
import java.util.concurrent.LinkedBlockingQueue;
1820
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,12 +31,16 @@
2931
import zipkin.TestObjects;
3032
import zipkin.collector.InMemoryCollectorMetrics;
3133
import zipkin.collector.kafka.KafkaCollector.Builder;
34+
import zipkin.internal.Span2;
35+
import zipkin.internal.Span2Codec;
36+
import zipkin.internal.Span2Converter;
3237
import zipkin.storage.AsyncSpanConsumer;
3338
import zipkin.storage.AsyncSpanStore;
3439
import zipkin.storage.SpanStore;
3540
import zipkin.storage.StorageComponent;
3641

3742
import static org.assertj.core.api.Assertions.assertThat;
43+
import static zipkin.TestObjects.LOTS_OF_SPANS;
3844
import static zipkin.TestObjects.TRACE;
3945

4046
public class KafkaCollectorTest {
@@ -130,6 +136,32 @@ public void messageWithMultipleSpans_json() throws Exception {
130136
assertThat(kafkaMetrics.spans()).isEqualTo(TestObjects.TRACE.size());
131137
}
132138

139+
/** Ensures list encoding works: a version 2 json encoded list of spans */
140+
@Test
141+
public void messageWithMultipleSpans_json2() throws Exception {
142+
Builder builder = builder("multiple_spans_json2");
143+
144+
List<Span> spans = Arrays.asList(
145+
LOTS_OF_SPANS[0],
146+
LOTS_OF_SPANS[1]
147+
);
148+
149+
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
150+
Span2Converter.fromSpan(spans.get(0)).get(0),
151+
Span2Converter.fromSpan(spans.get(1)).get(0)
152+
));
153+
154+
producer.send(new KeyedMessage<>(builder.topic, bytes));
155+
156+
try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
157+
assertThat(recvdSpans.take()).containsExactlyElementsOf(spans);
158+
}
159+
160+
assertThat(kafkaMetrics.messages()).isEqualTo(1);
161+
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
162+
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
163+
}
164+
133165
/** Ensures malformed spans don't hang the collector */
134166
@Test
135167
public void skipsMalformedData() throws Exception {

zipkin-collector/kafka10/src/main/java/zipkin/collector/kafka10/KafkaCollectorWorker.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.kafka.common.TopicPartition;
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
30-
import zipkin.Codec;
3130
import zipkin.collector.Collector;
3231
import zipkin.collector.CollectorMetrics;
3332

@@ -74,24 +73,7 @@ public void run() {
7473
if (bytes.length == 0) {
7574
metrics.incrementMessagesDropped();
7675
} else {
77-
// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
78-
// .. If the first byte isn't in that range, it isn't a thrift.
79-
//
80-
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
81-
//
82-
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
83-
// .. When serializing a Span (Struct), the first byte will be the type of a field
84-
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
85-
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
86-
if (bytes[0] == '[') {
87-
collector.acceptSpans(bytes, Codec.JSON, NOOP);
88-
} else {
89-
if (bytes[0] == 12 /* TType.STRUCT */) {
90-
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
91-
} else {
92-
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
93-
}
94-
}
76+
collector.acceptSpans(bytes, NOOP);
9577
}
9678
}
9779
}

zipkin-collector/kafka10/src/test/java/zipkin/collector/kafka10/KafkaCollectorTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.github.charithe.kafka.EphemeralKafkaBroker;
1717
import com.github.charithe.kafka.KafkaJunitRule;
18+
import java.util.Arrays;
1819
import java.util.List;
1920
import java.util.Properties;
2021
import java.util.concurrent.CopyOnWriteArraySet;
@@ -36,12 +37,16 @@
3637
import zipkin.Span;
3738
import zipkin.collector.InMemoryCollectorMetrics;
3839
import zipkin.collector.kafka10.KafkaCollector.Builder;
40+
import zipkin.internal.ApplyTimestampAndDuration;
41+
import zipkin.internal.Span2Codec;
42+
import zipkin.internal.Span2Converter;
3943
import zipkin.storage.AsyncSpanConsumer;
4044
import zipkin.storage.AsyncSpanStore;
4145
import zipkin.storage.SpanStore;
4246
import zipkin.storage.StorageComponent;
4347

4448
import static org.assertj.core.api.Assertions.assertThat;
49+
import static zipkin.TestObjects.LOTS_OF_SPANS;
4550
import static zipkin.TestObjects.TRACE;
4651

4752
public class KafkaCollectorTest {
@@ -184,6 +189,33 @@ public void messageWithMultipleSpans_json() throws Exception {
184189
assertThat(kafkaMetrics.spans()).isEqualTo(TRACE.size());
185190
}
186191

192+
/** Ensures list encoding works: a version 2 json encoded list of spans */
193+
@Test
194+
public void messageWithMultipleSpans_json2() throws Exception {
195+
Builder builder = builder("multiple_spans_json2");
196+
197+
List<Span> spans = Arrays.asList(
198+
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
199+
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
200+
);
201+
202+
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
203+
Span2Converter.fromSpan(spans.get(0)).get(0),
204+
Span2Converter.fromSpan(spans.get(1)).get(0)
205+
));
206+
207+
produceSpans(bytes, builder.topic);
208+
209+
try (KafkaCollector collector = builder.build()) {
210+
collector.start();
211+
assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
212+
}
213+
214+
assertThat(kafkaMetrics.messages()).isEqualTo(1);
215+
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
216+
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
217+
}
218+
187219
/** Ensures malformed spans don't hang the collector */
188220
@Test
189221
public void skipsMalformedData() throws Exception {

zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java

Lines changed: 41 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright 2015-2016 The OpenZipkin Authors
2+
* Copyright 2015-2017 The OpenZipkin Authors
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
55
* in compliance with the License. You may obtain a copy of the License at
@@ -25,8 +25,10 @@
2525
import zipkin.Codec;
2626
import zipkin.DependencyLink;
2727
import zipkin.Span;
28+
import zipkin.SpanDecoder;
2829
import zipkin.collector.Collector;
2930
import zipkin.collector.CollectorMetrics;
31+
import zipkin.internal.Span2JsonDecoder;
3032
import zipkin.storage.Callback;
3133
import zipkin.storage.QueryRequest;
3234
import zipkin.storage.SpanStore;
@@ -35,6 +37,8 @@
3537
import static zipkin.internal.Util.lowerHexToUnsignedLong;
3638

3739
final class ZipkinDispatcher extends Dispatcher {
40+
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();
41+
3842
private final SpanStore store;
3943
private final Collector consumer;
4044
private final CollectorMetrics metrics;
@@ -77,42 +81,50 @@ public MockResponse dispatch(RecordedRequest request) {
7781
}
7882
} else if (request.getMethod().equals("POST")) {
7983
if (url.encodedPath().equals("/api/v1/spans")) {
80-
metrics.incrementMessages();
81-
byte[] body = request.getBody().readByteArray();
82-
String encoding = request.getHeader("Content-Encoding");
83-
if (encoding != null && encoding.contains("gzip")) {
84-
try {
85-
Buffer result = new Buffer();
86-
GzipSource source = new GzipSource(new Buffer().write(body));
87-
while (source.read(result, Integer.MAX_VALUE) != -1) ;
88-
body = result.readByteArray();
89-
} catch (IOException e) {
90-
metrics.incrementMessagesDropped();
91-
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
92-
}
93-
}
9484
String type = request.getHeader("Content-Type");
95-
Codec codec = type != null && type.contains("/x-thrift") ? Codec.THRIFT : Codec.JSON;
96-
97-
final MockResponse result = new MockResponse();
98-
consumer.acceptSpans(body, codec, new Callback<Void>() {
99-
@Override public void onSuccess(Void value) {
100-
result.setResponseCode(202);
101-
}
102-
103-
@Override public void onError(Throwable t) {
104-
String message = t.getMessage();
105-
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
106-
}
107-
});
108-
return result;
85+
SpanDecoder decoder = type != null && type.contains("/x-thrift")
86+
? SpanDecoder.THRIFT_DECODER
87+
: SpanDecoder.JSON_DECODER;
88+
return acceptSpans(request, decoder);
89+
} else if (url.encodedPath().equals("/api/v2/spans")) {
90+
return acceptSpans(request, JSON2_DECODER);
10991
}
11092
} else { // unsupported method
11193
return new MockResponse().setResponseCode(405);
11294
}
11395
return new MockResponse().setResponseCode(404);
11496
}
11597

98+
MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
99+
metrics.incrementMessages();
100+
byte[] body = request.getBody().readByteArray();
101+
String encoding = request.getHeader("Content-Encoding");
102+
if (encoding != null && encoding.contains("gzip")) {
103+
try {
104+
Buffer result = new Buffer();
105+
GzipSource source = new GzipSource(new Buffer().write(body));
106+
while (source.read(result, Integer.MAX_VALUE) != -1) ;
107+
body = result.readByteArray();
108+
} catch (IOException e) {
109+
metrics.incrementMessagesDropped();
110+
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
111+
}
112+
}
113+
114+
final MockResponse result = new MockResponse();
115+
consumer.acceptSpans(body, decoder, new Callback<Void>() {
116+
@Override public void onSuccess(Void value) {
117+
result.setResponseCode(202);
118+
}
119+
120+
@Override public void onError(Throwable t) {
121+
String message = t.getMessage();
122+
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
123+
}
124+
});
125+
return result;
126+
}
127+
116128
static QueryRequest toQueryRequest(HttpUrl url) {
117129
return QueryRequest.builder().serviceName(url.queryParameter("serviceName"))
118130
.spanName(url.queryParameter("spanName"))

zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import zipkin.Annotation;
3131
import zipkin.Codec;
3232
import zipkin.Span;
33+
import zipkin.internal.ApplyTimestampAndDuration;
34+
import zipkin.internal.Span2Codec;
35+
import zipkin.internal.Span2Converter;
3336

3437
import static java.lang.String.format;
3538
import static java.util.Arrays.asList;
@@ -58,6 +61,30 @@ public void getTraces_storedViaPost() throws IOException {
5861
.containsOnly(trace);
5962
}
6063

64+
@Test
65+
public void getTraces_storedViaPostVersion2() throws IOException {
66+
List<Span> spans = Arrays.asList(
67+
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
68+
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
69+
);
70+
71+
byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
72+
Span2Converter.fromSpan(spans.get(0)).get(0),
73+
Span2Converter.fromSpan(spans.get(1)).get(0)
74+
));
75+
76+
// write the span to the zipkin using http api v2
77+
Response response = client.newCall(new Request.Builder()
78+
.url(zipkin.httpUrl() + "/api/v2/spans")
79+
.post(RequestBody.create(MediaType.parse("application/json"), bytes)).build()
80+
).execute();
81+
assertThat(response.code()).isEqualTo(202);
82+
83+
// read the traces directly
84+
assertThat(zipkin.getTraces())
85+
.containsOnly(asList(spans.get(0)), asList(spans.get(1)));
86+
}
87+
6188
/** The rule is here to help debugging. Even partial spans should be returned */
6289
@Test
6390
public void getTraces_whenMissingTimestamps() throws IOException {

0 commit comments

Comments
 (0)