Skip to content

Commit 05328a8

Browse files
author
Adrian Cole
committed
WIP: ES_EXPERIMENTAL_SPAN2=true to enable
1 parent 4f99f63 commit 05328a8

38 files changed

+2359
-870
lines changed

zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434

3535
import static org.assertj.core.api.Assertions.assertThat;
3636
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
37+
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
3738

3839
public class ZipkinElasticsearchHttpStorageAutoConfigurationTest {
3940

@@ -246,8 +247,8 @@ public void dailyIndexFormat() {
246247
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
247248
context.refresh();
248249

249-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
250-
.isEqualTo("zipkin-1970-01-01");
250+
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
251+
.isEqualTo("zipkin:span-1970-01-01");
251252
}
252253

253254
@Test
@@ -262,8 +263,8 @@ public void dailyIndexFormat_overridingPrefix() {
262263
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
263264
context.refresh();
264265

265-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
266-
.isEqualTo("zipkin_prod-1970-01-01");
266+
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
267+
.isEqualTo("zipkin_prod:span-1970-01-01");
267268
}
268269

269270
@Test
@@ -278,8 +279,8 @@ public void dailyIndexFormat_overridingDateSeparator() {
278279
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
279280
context.refresh();
280281

281-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
282-
.isEqualTo("zipkin-1970.01.01");
282+
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
283+
.isEqualTo("zipkin:span-1970.01.01");
283284
}
284285

285286
@Test

zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class CassandraTest {
3030

3131
@ClassRule
3232
public static LazyCassandra3Storage storage =
33-
new LazyCassandra3Storage("cassandra:3.10", "test_zipkin3");
33+
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.0", "test_zipkin3");
3434

3535
public static class DependenciesTest extends CassandraDependenciesTest {
3636
@Override protected Cassandra3Storage storage() {

zipkin-storage/elasticsearch-http/README.md

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Here are some examples:
2323

2424
## Indexes
2525
Spans are stored into daily indices, for example spans with a timestamp
26-
falling on 2016/03/19 will be stored in the index named 'zipkin-2016-03-19'.
26+
falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'.
2727
There is no support for TTL through this SpanStore. It is recommended
2828
instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html)
2929
to remove indices older than the point you are interested in.
@@ -36,8 +36,8 @@ the date separator from '-' to something else.
3636
control the daily index format.
3737

3838
For example, spans with a timestamp falling on 2016/03/19 end up in the
39-
index 'zipkin-2016-03-19'. When the date separator is '.', the index
40-
would be 'zipkin-2016.03.19'.
39+
index 'zipkin:span-2016-03-19'. When the date separator is '.', the index
40+
would be 'zipkin:span-2016.03.19'.
4141

4242
### String Mapping
4343
The Zipkin api implies aggregation and exact match (keyword) on string
@@ -82,54 +82,14 @@ your indexes:
8282

8383
```bash
8484
# the output below shows which tokens will match on the trace id supplied.
85-
$ curl -s localhost:9200/test_zipkin_http-2016-10-26/_analyze -d '{
85+
$ curl -s localhost:9200/test_zipkin_http:span-2016-10-26/_analyze -d '{
8686
"text": "48485a3953bb61246b221d5bc9e6496c",
8787
"analyzer": "traceId_analyzer"
8888
}'|jq '.tokens|.[]|.token'
8989
"48485a3953bb61246b221d5bc9e6496c"
9090
"6b221d5bc9e6496c"
9191
```
9292

93-
### Span and service Names
94-
Zipkin defines span and service names as lowercase. At write time, any
95-
mixed case span or service names are downcased. If writing a custom
96-
collector in a different language, make sure you write span and service
97-
names in lowercase. Also, if there are any custom query tools, ensure
98-
inputs are downcased.
99-
100-
Span and service name queries default to look back 24hrs (2 index days).
101-
This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback`
102-
103-
#### Index format
104-
Starting with Zipkin 1.23, service and span names are written to the
105-
same daily indexes as spans and dependency links as the document type
106-
"servicespan". This was added for performance reasons as formerly search
107-
was using relatively expensive nested queries.
108-
109-
The documents themselves represent service and span name pairs. Only one
110-
document is present per daily index. This is to keep the documents from
111-
repeating at a multiplier of span count, which also simplifies query.
112-
This deduplication is enforced at write time by using an ID convention
113-
of the service and span name. Ex. `id = MyServiceName|MySpanName`
114-
115-
The document is a simple structure, like:
116-
```json
117-
{
118-
"serviceName": "MyServiceName",
119-
"spanName": "MySpanName",
120-
}
121-
```
122-
123-
The document does replicate data in the ID, but it is needed as you
124-
cannot query based on an ID expression.
125-
126-
#### Notes for data written prior to Zipkin 1.23
127-
Before Zipkin 1.23, service and span names were nested queries against
128-
the span type. This was an expensive operation, which resulted in high
129-
latency particularly when the UI loads. When the "servicespan" type is
130-
missing from an index, or there's no results returned, a fallback nested
131-
query is invoked.
132-
13393
## Customizing the ingest pipeline
13494

13595
When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/BodyConverters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ final class BodyConverters {
3434
};
3535
static final BodyConverter<List<Span>> SPANS =
3636
SearchResultConverter.create(JsonAdapters.SPAN_ADAPTER);
37-
static final BodyConverter<List<Span>> NULLABLE_SPANS =
37+
static final BodyConverter<List<Span>> NULLABLE_SPAN2 =
3838
SearchResultConverter.create(JsonAdapters.SPAN_ADAPTER).defaultToNull();
3939
static final BodyConverter<List<DependencyLink>> DEPENDENCY_LINKS =
4040
new SearchResultConverter<DependencyLink>(JsonAdapters.DEPENDENCY_LINK_ADAPTER) {

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java

Lines changed: 44 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,34 @@
1313
*/
1414
package zipkin.storage.elasticsearch.http;
1515

16-
import com.squareup.moshi.JsonWriter;
1716
import java.io.IOException;
18-
import java.util.LinkedHashMap;
19-
import java.util.LinkedHashSet;
2017
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Set;
2318
import java.util.concurrent.TimeUnit;
24-
import okio.Buffer;
25-
import zipkin.Codec;
2619
import zipkin.Span;
27-
import zipkin.internal.Pair;
20+
import zipkin.internal.Nullable;
21+
import zipkin.internal.Span2;
22+
import zipkin.internal.Span2Codec;
23+
import zipkin.internal.Span2Converter;
2824
import zipkin.storage.AsyncSpanConsumer;
2925
import zipkin.storage.Callback;
3026

3127
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
3228
import static zipkin.internal.Util.UTF_8;
3329
import static zipkin.internal.Util.propagateIfFatal;
34-
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
30+
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
3531

3632
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
3733

3834
final ElasticsearchHttpStorage es;
3935
final IndexNameFormatter indexNameFormatter;
4036

4137
ElasticsearchHttpSpanConsumer(ElasticsearchHttpStorage es) {
38+
this(es, es.indexNameFormatter());
39+
}
40+
41+
ElasticsearchHttpSpanConsumer(ElasticsearchHttpStorage es, IndexNameFormatter indexNameFormatter) {
4242
this.es = es;
43-
this.indexNameFormatter = es.indexNameFormatter();
43+
this.indexNameFormatter = indexNameFormatter;
4444
}
4545

4646
@Override public void accept(List<Span> spans, Callback<Void> callback) {
@@ -49,76 +49,64 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
4949
return;
5050
}
5151
try {
52-
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
53-
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
54-
if (!indexToServiceSpans.isEmpty()) {
55-
indexNames(indexer, indexToServiceSpans);
56-
}
52+
BulkSpanIndexer indexer = newBulkSpanIndexer(es);
53+
indexSpans(indexer, spans);
5754
indexer.execute(callback);
5855
} catch (Throwable t) {
5956
propagateIfFatal(t);
6057
callback.onError(t);
6158
}
6259
}
6360

64-
/** Indexes spans and returns a mapping of indexes that may need a names update */
65-
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
66-
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
61+
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
6762
for (Span span : spans) {
6863
Long timestamp = guessTimestamp(span);
69-
Long timestampMillis;
70-
String index; // which index to store this span into
64+
long indexTimestamp = 0L; // which index to store this span into
65+
Long spanTimestamp;
7166
if (timestamp != null) {
72-
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
73-
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
67+
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
7468
} else {
75-
timestampMillis = null;
69+
spanTimestamp = null;
7670
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
7771
// the index bucket, any annotation is better than using current time.
78-
Long indexTimestamp = null;
7972
for (int i = 0, length = span.annotations.size(); i < length; i++) {
8073
indexTimestamp = span.annotations.get(i).timestamp / 1000;
8174
break;
8275
}
83-
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
84-
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
76+
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
8577
}
86-
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
87-
byte[] document = Codec.JSON.writeSpan(span);
88-
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
89-
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
78+
indexer.add(indexTimestamp, span, spanTimestamp);
9079
}
91-
return indexToServiceSpans;
9280
}
9381

94-
void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
95-
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
96-
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
97-
for (String serviceName : s.serviceNames()) {
98-
serviceSpans.add(Pair.create(serviceName, s.name));
99-
}
82+
83+
BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
84+
return new BulkSpanIndexer(es);
10085
}
10186

102-
/**
103-
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104-
* a large order of duplicates ending up in the daily index. This also means queries do not need
105-
* to deduplicate.
106-
*/
107-
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
108-
throws IOException {
109-
Buffer buffer = new Buffer();
110-
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
111-
String index = entry.getKey();
112-
for (Pair<String> serviceSpan : entry.getValue()) {
113-
JsonWriter writer = JsonWriter.of(buffer);
114-
writer.beginObject();
115-
writer.name("serviceName").value(serviceSpan._1);
116-
writer.name("spanName").value(serviceSpan._2);
117-
writer.endObject();
118-
byte[] document = buffer.readByteArray();
119-
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
87+
static class BulkSpanIndexer {
88+
final HttpBulkIndexer indexer;
89+
final IndexNameFormatter indexNameFormatter;
90+
91+
BulkSpanIndexer(ElasticsearchHttpStorage es) {
92+
this.indexer = new HttpBulkIndexer("index-span", es);
93+
this.indexNameFormatter = es.indexNameFormatter();
94+
}
95+
96+
void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
97+
String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
98+
for (Span2 span2 : Span2Converter.fromSpan(span)) {
99+
byte[] document = Span2Codec.JSON.writeSpan(span2);
100+
if (timestampMillis != null) {
101+
document = prefixWithTimestampMillis(document, timestampMillis);
102+
}
103+
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
120104
}
121105
}
106+
107+
void execute(Callback<Void> callback) throws IOException {
108+
indexer.execute(callback);
109+
}
122110
}
123111

124112
private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);
@@ -131,7 +119,7 @@ void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToS
131119
static byte[] prefixWithTimestampMillis(byte[] input, long timestampMillis) {
132120
String dateAsString = Long.toString(timestampMillis);
133121
byte[] newSpanBytes =
134-
new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length];
122+
new byte[TIMESTAMP_MILLIS_PREFIX.length + dateAsString.length() + input.length];
135123
int pos = 0;
136124
System.arraycopy(TIMESTAMP_MILLIS_PREFIX, 0, newSpanBytes, pos, TIMESTAMP_MILLIS_PREFIX.length);
137125
pos += TIMESTAMP_MILLIS_PREFIX.length;

0 commit comments

Comments
 (0)