Skip to content

Commit 0271db1

Browse files
author
Adrian Cole
committed
Spikes ES support
1 parent afb2872 commit 0271db1

30 files changed

+540
-811
lines changed

zipkin-autoconfigure/storage-elasticsearch-http/src/test/java/zipkin/storage/elasticsearch/http/ZipkinElasticsearchHttpStorageAutoConfigurationTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,8 +246,8 @@ public void dailyIndexFormat() {
246246
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
247247
context.refresh();
248248

249-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
250-
.isEqualTo("zipkin-1970-01-01");
249+
assertThat(es().indexNameFormatter().indexNameForTimestamp("span", 0))
250+
.isEqualTo("zipkin:span-1970-01-01");
251251
}
252252

253253
@Test
@@ -262,8 +262,8 @@ public void dailyIndexFormat_overridingPrefix() {
262262
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
263263
context.refresh();
264264

265-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
266-
.isEqualTo("zipkin_prod-1970-01-01");
265+
assertThat(es().indexNameFormatter().indexNameForTimestamp("span", 0))
266+
.isEqualTo("zipkin_prod:span-1970-01-01");
267267
}
268268

269269
@Test
@@ -278,8 +278,8 @@ public void dailyIndexFormat_overridingDateSeparator() {
278278
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
279279
context.refresh();
280280

281-
assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
282-
.isEqualTo("zipkin-1970.01.01");
281+
assertThat(es().indexNameFormatter().indexNameForTimestamp("span", 0))
282+
.isEqualTo("zipkin:span-1970.01.01");
283283
}
284284

285285
@Test

zipkin-storage/cassandra3/src/test/java/zipkin/storage/cassandra3/integration/CassandraTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class CassandraTest {
3030

3131
@ClassRule
3232
public static LazyCassandra3Storage storage =
33-
new LazyCassandra3Storage("cassandra:3.10", "test_zipkin3");
33+
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.0", "test_zipkin3");
3434

3535
public static class DependenciesTest extends CassandraDependenciesTest {
3636
@Override protected Cassandra3Storage storage() {

zipkin-storage/elasticsearch-http/README.md

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Here are some examples:
2323

2424
## Indexes
2525
Spans are stored into daily indices, for example spans with a timestamp
26-
falling on 2016/03/19 will be stored in the index named 'zipkin-2016-03-19'.
26+
falling on 2016/03/19 will be stored in the index named 'zipkin:span-2016-03-19'.
2727
There is no support for TTL through this SpanStore. It is recommended
2828
instead to use [Elastic Curator](https://www.elastic.co/guide/en/elasticsearch/client/curator/current/about.html)
2929
to remove indices older than the point you are interested in.
@@ -36,8 +36,8 @@ the date separator from '-' to something else.
3636
control the daily index format.
3737

3838
For example, spans with a timestamp falling on 2016/03/19 end up in the
39-
index 'zipkin-2016-03-19'. When the date separator is '.', the index
40-
would be 'zipkin-2016.03.19'.
39+
index 'zipkin:span-2016-03-19'. When the date separator is '.', the index
40+
would be 'zipkin:span-2016.03.19'.
4141

4242
### String Mapping
4343
The Zipkin api implies aggregation and exact match (keyword) on string
@@ -82,54 +82,14 @@ your indexes:
8282

8383
```bash
8484
# the output below shows which tokens will match on the trace id supplied.
85-
$ curl -s localhost:9200/test_zipkin_http-2016-10-26/_analyze -d '{
85+
$ curl -s localhost:9200/test_zipkin_http:span-2016-10-26/_analyze -d '{
8686
"text": "48485a3953bb61246b221d5bc9e6496c",
8787
"analyzer": "traceId_analyzer"
8888
}'|jq '.tokens|.[]|.token'
8989
"48485a3953bb61246b221d5bc9e6496c"
9090
"6b221d5bc9e6496c"
9191
```
9292

93-
### Span and service Names
94-
Zipkin defines span and service names as lowercase. At write time, any
95-
mixed case span or service names are downcased. If writing a custom
96-
collector in a different language, make sure you write span and service
97-
names in lowercase. Also, if there are any custom query tools, ensure
98-
inputs are downcased.
99-
100-
Span and service name queries default to look back 24hrs (2 index days).
101-
This can be controlled by `ElasticsearchHttpStorage.Builder.namesLookback`
102-
103-
#### Index format
104-
Starting with Zipkin 1.23, service and span names are written to the
105-
same daily indexes as spans and dependency links as the document type
106-
"servicespan". This was added for performance reasons as formerly search
107-
was using relatively expensive nested queries.
108-
109-
The documents themselves represent service and span name pairs. Only one
110-
document is present per daily index. This is to keep the documents from
111-
repeating at a multiplier of span count, which also simplifies query.
112-
This deduplication is enforced at write time by using an ID convention
113-
of the service and span name. Ex. `id = MyServiceName|MySpanName`
114-
115-
The document is a simple structure, like:
116-
```json
117-
{
118-
"serviceName": "MyServiceName",
119-
"spanName": "MySpanName",
120-
}
121-
```
122-
123-
The document does replicate data in the ID, but it is needed as you
124-
cannot query based on an ID expression.
125-
126-
#### Notes for data written prior to Zipkin 1.23
127-
Before Zipkin 1.23, service and span names were nested queries against
128-
the span type. This was an expensive operation, which resulted in high
129-
latency particularly when the UI loads. When the "servicespan" type is
130-
missing from an index, or there's no results returned, a fallback nested
131-
query is invoked.
132-
13393
## Customizing the ingest pipeline
13494

13595
When using Elasticsearch 5.x, you can setup an [ingest pipeline](https://www.elastic.co/guide/en/elasticsearch/reference/master/pipeline.html)

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,19 @@
1313
*/
1414
package zipkin.storage.elasticsearch.http;
1515

16-
import com.squareup.moshi.JsonWriter;
17-
import java.io.IOException;
18-
import java.util.LinkedHashMap;
19-
import java.util.LinkedHashSet;
2016
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Set;
2317
import java.util.concurrent.TimeUnit;
24-
import okio.Buffer;
25-
import zipkin.Codec;
2618
import zipkin.Span;
27-
import zipkin.internal.Pair;
19+
import zipkin.internal.Span2;
20+
import zipkin.internal.Span2Codec;
21+
import zipkin.internal.Span2Converter;
2822
import zipkin.storage.AsyncSpanConsumer;
2923
import zipkin.storage.Callback;
3024

3125
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
3226
import static zipkin.internal.Util.UTF_8;
3327
import static zipkin.internal.Util.propagateIfFatal;
34-
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
28+
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
3529

3630
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
3731

@@ -50,27 +44,22 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
5044
}
5145
try {
5246
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
53-
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
54-
if (!indexToServiceSpans.isEmpty()) {
55-
indexNames(indexer, indexToServiceSpans);
56-
}
47+
indexSpans(indexer, spans);
5748
indexer.execute(callback);
5849
} catch (Throwable t) {
5950
propagateIfFatal(t);
6051
callback.onError(t);
6152
}
6253
}
6354

64-
/** Indexes spans and returns a mapping of indexes that may need a names update */
65-
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
66-
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
55+
void indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
6756
for (Span span : spans) {
6857
Long timestamp = guessTimestamp(span);
6958
Long timestampMillis;
7059
String index; // which index to store this span into
7160
if (timestamp != null) {
7261
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
73-
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
62+
index = indexNameFormatter.indexNameForTimestamp(SPAN, timestampMillis);
7463
} else {
7564
timestampMillis = null;
7665
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
@@ -81,42 +70,14 @@ Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> sp
8170
break;
8271
}
8372
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
84-
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
73+
index = indexNameFormatter.indexNameForTimestamp(SPAN, indexTimestamp);
8574
}
86-
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
87-
byte[] document = Codec.JSON.writeSpan(span);
88-
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
89-
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
90-
}
91-
return indexToServiceSpans;
92-
}
93-
94-
void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
95-
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
96-
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
97-
for (String serviceName : s.serviceNames()) {
98-
serviceSpans.add(Pair.create(serviceName, s.name));
99-
}
100-
}
101-
102-
/**
103-
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104-
* a large order of duplicates ending up in the daily index. This also means queries do not need
105-
* to deduplicate.
106-
*/
107-
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
108-
throws IOException {
109-
Buffer buffer = new Buffer();
110-
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
111-
String index = entry.getKey();
112-
for (Pair<String> serviceSpan : entry.getValue()) {
113-
JsonWriter writer = JsonWriter.of(buffer);
114-
writer.beginObject();
115-
writer.name("serviceName").value(serviceSpan._1);
116-
writer.name("spanName").value(serviceSpan._2);
117-
writer.endObject();
118-
byte[] document = buffer.readByteArray();
119-
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
75+
for (Span2 span2 : Span2Converter.fromSpan(span)) {
76+
byte[] document = Span2Codec.JSON.writeSpan(span2);
77+
if (timestampMillis != null) {
78+
document = prefixWithTimestampMillis(document, timestampMillis);
79+
}
80+
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
12081
}
12182
}
12283
}

0 commit comments

Comments
 (0)