Skip to content

Accepts Zipkin v2 Span format in all current transports #1684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,10 +13,8 @@
*/
package zipkin.collector.kafka;

import java.util.Collections;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -47,24 +45,7 @@ public void run() {
continue;
}

// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,6 +13,7 @@
*/
package zipkin.collector.kafka;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -29,12 +30,16 @@
import zipkin.TestObjects;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -130,6 +135,32 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TestObjects.TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

producer.send(new KeyedMessage<>(builder.topic, bytes));

try (KafkaCollector collector = newKafkaTransport(builder, consumer)) {
assertThat(recvdSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin.Codec;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;

Expand Down Expand Up @@ -74,24 +73,7 @@ public void run() {
if (bytes.length == 0) {
metrics.incrementMessagesDropped();
} else {
// In TBinaryProtocol encoding, the first byte is the TType, in a range 0-16
// .. If the first byte isn't in that range, it isn't a thrift.
//
// When byte(0) == '[' (91), assume it is a list of json-encoded spans
//
// When byte(0) <= 16, assume it is a TBinaryProtocol-encoded thrift
// .. When serializing a Span (Struct), the first byte will be the type of a field
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT(12)
// .. As ThriftSpan has no STRUCT fields: so, if the first byte is TType.STRUCT(12), it is a list.
if (bytes[0] == '[') {
collector.acceptSpans(bytes, Codec.JSON, NOOP);
} else {
if (bytes[0] == 12 /* TType.STRUCT */) {
collector.acceptSpans(bytes, Codec.THRIFT, NOOP);
} else {
collector.acceptSpans(Collections.singletonList(bytes), Codec.THRIFT, NOOP);
}
}
collector.acceptSpans(bytes, NOOP);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.github.charithe.kafka.EphemeralKafkaBroker;
import com.github.charithe.kafka.KafkaJunitRule;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -36,12 +37,16 @@
import zipkin.Span;
import zipkin.collector.InMemoryCollectorMetrics;
import zipkin.collector.kafka10.KafkaCollector.Builder;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.AsyncSpanStore;
import zipkin.storage.SpanStore;
import zipkin.storage.StorageComponent;

import static org.assertj.core.api.Assertions.assertThat;
import static zipkin.TestObjects.LOTS_OF_SPANS;
import static zipkin.TestObjects.TRACE;

public class KafkaCollectorTest {
Expand Down Expand Up @@ -184,6 +189,33 @@ public void messageWithMultipleSpans_json() throws Exception {
assertThat(kafkaMetrics.spans()).isEqualTo(TRACE.size());
}

/** Ensures list encoding works: a version 2 json encoded list of spans */
@Test
public void messageWithMultipleSpans_json2() throws Exception {
Builder builder = builder("multiple_spans_json2");

List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

produceSpans(bytes, builder.topic);

try (KafkaCollector collector = builder.build()) {
collector.start();
assertThat(receivedSpans.take()).containsAll(spans);
}

assertThat(kafkaMetrics.messages()).isEqualTo(1);
assertThat(kafkaMetrics.bytes()).isEqualTo(bytes.length);
assertThat(kafkaMetrics.spans()).isEqualTo(spans.size());
}

/** Ensures malformed spans don't hang the collector */
@Test
public void skipsMalformedData() throws Exception {
Expand Down
70 changes: 41 additions & 29 deletions zipkin-junit/src/main/java/zipkin/junit/ZipkinDispatcher.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2015-2016 The OpenZipkin Authors
* Copyright 2015-2017 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -25,8 +25,10 @@
import zipkin.Codec;
import zipkin.DependencyLink;
import zipkin.Span;
import zipkin.SpanDecoder;
import zipkin.collector.Collector;
import zipkin.collector.CollectorMetrics;
import zipkin.internal.Span2JsonDecoder;
import zipkin.storage.Callback;
import zipkin.storage.QueryRequest;
import zipkin.storage.SpanStore;
Expand All @@ -35,6 +37,8 @@
import static zipkin.internal.Util.lowerHexToUnsignedLong;

final class ZipkinDispatcher extends Dispatcher {
static final SpanDecoder JSON2_DECODER = new Span2JsonDecoder();

private final SpanStore store;
private final Collector consumer;
private final CollectorMetrics metrics;
Expand Down Expand Up @@ -77,42 +81,50 @@ public MockResponse dispatch(RecordedRequest request) {
}
} else if (request.getMethod().equals("POST")) {
if (url.encodedPath().equals("/api/v1/spans")) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}
String type = request.getHeader("Content-Type");
Codec codec = type != null && type.contains("/x-thrift") ? Codec.THRIFT : Codec.JSON;

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, codec, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
SpanDecoder decoder = type != null && type.contains("/x-thrift")
? SpanDecoder.THRIFT_DECODER
: SpanDecoder.JSON_DECODER;
return acceptSpans(request, decoder);
} else if (url.encodedPath().equals("/api/v2/spans")) {
return acceptSpans(request, JSON2_DECODER);
}
} else { // unsupported method
return new MockResponse().setResponseCode(405);
}
return new MockResponse().setResponseCode(404);
}

MockResponse acceptSpans(RecordedRequest request, SpanDecoder decoder) {
metrics.incrementMessages();
byte[] body = request.getBody().readByteArray();
String encoding = request.getHeader("Content-Encoding");
if (encoding != null && encoding.contains("gzip")) {
try {
Buffer result = new Buffer();
GzipSource source = new GzipSource(new Buffer().write(body));
while (source.read(result, Integer.MAX_VALUE) != -1) ;
body = result.readByteArray();
} catch (IOException e) {
metrics.incrementMessagesDropped();
return new MockResponse().setResponseCode(400).setBody("Cannot gunzip spans");
}
}

final MockResponse result = new MockResponse();
consumer.acceptSpans(body, decoder, new Callback<Void>() {
@Override public void onSuccess(Void value) {
result.setResponseCode(202);
}

@Override public void onError(Throwable t) {
String message = t.getMessage();
result.setBody(message).setResponseCode(message.startsWith("Cannot store") ? 500 : 400);
}
});
return result;
}

static QueryRequest toQueryRequest(HttpUrl url) {
return QueryRequest.builder().serviceName(url.queryParameter("serviceName"))
.spanName(url.queryParameter("spanName"))
Expand Down
27 changes: 27 additions & 0 deletions zipkin-junit/src/test/java/zipkin/junit/ZipkinRuleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import zipkin.Annotation;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.ApplyTimestampAndDuration;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;

import static java.lang.String.format;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -58,6 +61,30 @@ public void getTraces_storedViaPost() throws IOException {
.containsOnly(trace);
}

@Test
public void getTraces_storedViaPostVersion2() throws IOException {
List<Span> spans = Arrays.asList(
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[0]),
ApplyTimestampAndDuration.apply(LOTS_OF_SPANS[1])
);

byte[] bytes = Span2Codec.JSON.writeSpans(Arrays.asList(
Span2Converter.fromSpan(spans.get(0)).get(0),
Span2Converter.fromSpan(spans.get(1)).get(0)
));

// write the span to the zipkin using http api v2
Response response = client.newCall(new Request.Builder()
.url(zipkin.httpUrl() + "/api/v2/spans")
.post(RequestBody.create(MediaType.parse("application/json"), bytes)).build()
).execute();
assertThat(response.code()).isEqualTo(202);

// read the traces directly
assertThat(zipkin.getTraces())
.containsOnly(asList(spans.get(0)), asList(spans.get(1)));
}

/** The rule is here to help debugging. Even partial spans should be returned */
@Test
public void getTraces_whenMissingTimestamps() throws IOException {
Expand Down
Loading