Skip to content

Commit 6f5a378

Browse files
author
Adrian Cole
committed
Supports reading multiple spans per Kafka message
Kafka messages have binary payloads and no key. The binary contents are serialized TBinaryProtocol thrift messages. This change peeks at thei first bytes to see if it is a List of structs or not, reading accordingly. This approach would need a revision if we ever add a Struct field to Span. However, that is unlikely. At the point we change the structure of Span, we'd likely change other aspects which would make it a different struct completely (see #939). In such case, we'd add a key to the kafka message of the span version, and not hit the code affected in this change. Fixes #979
1 parent 95c017e commit 6f5a378

File tree

5 files changed

+74
-36
lines changed

5 files changed

+74
-36
lines changed

zipkin-receiver-kafka/src/main/scala/com/twitter/zipkin/receiver/kafka/KafkaSpanReceiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ trait KafkaSpanReceiverFactory { self: App =>
5050
process: Seq[ThriftSpan] => Future[Unit],
5151
stats: StatsReceiver = DefaultStatsReceiver.scope("KafkaSpanReceiver"),
5252
keyDecoder: Decoder[T] = KafkaProcessor.defaultKeyDecoder,
53-
valueDecoder: KafkaProcessor.KafkaDecoder = new SpanCodec()
53+
valueDecoder: KafkaProcessor.KafkaDecoder = new SpanDecoder()
5454
): SpanReceiver = new SpanReceiver {
5555

5656

zipkin-receiver-kafka/src/main/scala/com/twitter/zipkin/receiver/kafka/SpanCodec.scala

Lines changed: 0 additions & 27 deletions
This file was deleted.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package com.twitter.zipkin.receiver.kafka
2+
3+
import com.twitter.scrooge.BinaryThriftStructSerializer
4+
import com.twitter.zipkin.conversions.thrift
5+
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
6+
import org.apache.thrift.protocol.TType
7+
8+
class SpanDecoder extends KafkaProcessor.KafkaDecoder {
9+
val deserializer = new BinaryThriftStructSerializer[ThriftSpan] {
10+
def codec = ThriftSpan
11+
}
12+
13+
// Given the thrift encoding is TBinaryProtocol..
14+
// .. When serializing a Span (Struct), the first byte will be the type of a field
15+
// .. When serializing a List[ThriftSpan], the first byte is the member type, TType.STRUCT
16+
// Span has no STRUCT fields: we assume that if the first byte is TType.STRUCT is a list.
17+
def fromBytes(bytes: Array[Byte]) =
18+
if (bytes(0) == TType.STRUCT) {
19+
thrift.thriftListToThriftSpans(bytes)
20+
} else {
21+
List(deserializer.fromBytes(bytes))
22+
}
23+
}

zipkin-receiver-kafka/src/test/scala/com/twitter/zipkin/receiver/kafka/KafkaProcessorSpec.scala

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package com.twitter.zipkin.receiver.kafka
33
import java.util.concurrent.LinkedBlockingQueue
44

55
import com.github.charithe.kafka.KafkaJunitRule
6+
import com.twitter.io.Buf
67
import com.twitter.util.{Await, Future, Promise}
78
import com.twitter.zipkin.common._
89
import com.twitter.zipkin.conversions.thrift._
910
import com.twitter.zipkin.thriftscala.{Span => ThriftSpan}
1011
import kafka.producer._
12+
import org.apache.thrift.protocol.{TType, TList, TBinaryProtocol}
13+
import org.apache.thrift.transport.TMemoryBuffer
1114
import org.junit.{ClassRule, Test}
1215
import org.scalatest.junit.JUnitSuite
1316

@@ -26,7 +29,7 @@ class KafkaProcessorSpec extends JUnitSuite {
2629
val annotation = Annotation(1, "value", Some(Endpoint(1, 2, "service")))
2730
// Intentionally leaving timestamp and duration unset, as legacy instrumentation don't set this.
2831
val span = Span(1234, "methodname", 4567, annotations = List(annotation))
29-
val codec = new SpanCodec()
32+
val codec = new SpanDecoder()
3033

3134
@Test def messageWithSingleSpan() {
3235
val topic = "single_span"
@@ -38,14 +41,33 @@ class KafkaProcessorSpec extends JUnitSuite {
3841
}, codec, codec)
3942

4043
val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())
41-
producer.send(new KeyedMessage(topic, codec.encode(span)))
44+
producer.send(new KeyedMessage(topic, encode(span)))
4245
producer.close()
4346

4447
assert(Await.result(recvdSpan) == Seq(span.toThrift))
4548

4649
Await.result(service.close())
4750
}
4851

52+
@Test def messageWithMultipleSpans() {
53+
val topic = "multiple_spans"
54+
val recvdSpan = new Promise[Seq[ThriftSpan]]
55+
56+
val service = KafkaProcessor(Map(topic -> 1), kafkaRule.consumerConfig(), { s =>
57+
recvdSpan.setValue(s)
58+
Future.value(true)
59+
}, codec, codec)
60+
61+
val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())
62+
producer.send(new KeyedMessage(topic, encode(Seq(span, span)))) // 2 spans in one message
63+
producer.close()
64+
65+
// make sure we decoded both spans from the same message
66+
assert(Await.result(recvdSpan) == Seq(span.toThrift, span.toThrift))
67+
68+
Await.result(service.close())
69+
}
70+
4971
@Test def skipsMalformedData() {
5072
val topic = "malformed"
5173
val recvdSpans = new LinkedBlockingQueue[Seq[ThriftSpan]](3)
@@ -57,14 +79,32 @@ class KafkaProcessorSpec extends JUnitSuite {
5779

5880
val producer = new Producer[Array[Byte], Array[Byte]](kafkaRule.producerConfigWithDefaultEncoder())
5981

60-
producer.send(new KeyedMessage(topic, codec.encode(span)))
82+
producer.send(new KeyedMessage(topic, encode(span)))
6183
producer.send(new KeyedMessage(topic, "malformed".getBytes()))
62-
producer.send(new KeyedMessage(topic, codec.encode(span)))
84+
producer.send(new KeyedMessage(topic, encode(span)))
6385
producer.close()
6486

6587
for (elem <- 1 until 2)
6688
assert(recvdSpans.take() == Seq(span.toThrift))
6789

6890
Await.result(service.close())
6991
}
92+
93+
def encode(span: Span) = {
94+
val transport = new TMemoryBuffer(0)
95+
val oproto = new TBinaryProtocol(transport)
96+
val tspan = spanToThriftSpan(span)
97+
tspan.toThrift.write(oproto)
98+
transport.getArray()
99+
}
100+
101+
def encode(spans: Seq[Span]) = {
102+
// serialize all spans as a thrift list
103+
val transport = new TMemoryBuffer(0)
104+
val oproto = new TBinaryProtocol(transport)
105+
oproto.writeListBegin(new TList(TType.STRUCT, spans.size))
106+
spans.map(spanToThriftSpan).foreach(_.toThrift.write(oproto))
107+
oproto.writeListEnd()
108+
transport.getArray()
109+
}
70110
}

zipkin-scrooge/src/main/scala/com/twitter/zipkin/conversions/thrift.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,21 +132,23 @@ object thrift {
132132
implicit def spanToThriftSpan(s: Span) = new ThriftSpan(s)
133133
implicit def thriftSpanToSpan(s: thriftscala.Span) = new WrappedSpan(s)
134134

135-
def thriftListToSpans(bytes: Array[Byte]) = {
135+
def thriftListToThriftSpans(bytes: Array[Byte]) = {
136136
val proto = new TBinaryProtocol(TArrayByteTransport(bytes))
137137
val _list = proto.readListBegin()
138138
if (_list.size > 10000) {
139139
throw new IllegalArgumentException(_list.size + " > 10000: possibly malformed thrift")
140140
}
141-
val result = new ArrayBuffer[Span](_list.size)
141+
val result = new ArrayBuffer[thriftscala.Span](_list.size)
142142
for (i <- 1 to _list.size) {
143-
val thrift = thriftscala.Span.decode(proto)
144-
result += thriftSpanToSpan(thrift).toSpan
143+
result += thriftscala.Span.decode(proto)
145144
}
146145
proto.readListEnd()
147146
result.toList
148147
}
149148

149+
def thriftListToSpans(bytes: Array[Byte]) =
150+
thriftListToThriftSpans(bytes).map(thriftSpanToSpan(_).toSpan)
151+
150152
class WrappedDependencyLink(dl: DependencyLink) {
151153
lazy val toThrift = thriftscala.DependencyLink(dl.parent, dl.child, dl.callCount)
152154
}

0 commit comments

Comments
 (0)