Skip to content

Commit 33c4cf7

Browse files
author
Adrian Cole
committed
Spikes ES support
1 parent 5a9bc3f commit 33c4cf7

14 files changed

+168
-518
lines changed

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,18 @@
1313
*/
1414
package zipkin.storage.elasticsearch.http;
1515

16-
import com.squareup.moshi.JsonWriter;
17-
import java.io.IOException;
18-
import java.util.LinkedHashMap;
19-
import java.util.LinkedHashSet;
2016
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Set;
2317
import java.util.concurrent.TimeUnit;
24-
import okio.Buffer;
25-
import zipkin.Codec;
2618
import zipkin.Span;
27-
import zipkin.internal.Pair;
19+
import zipkin.internal.Span2;
20+
import zipkin.internal.Span2Codec;
21+
import zipkin.internal.Span2Converter;
2822
import zipkin.storage.AsyncSpanConsumer;
2923
import zipkin.storage.Callback;
3024

3125
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
3226
import static zipkin.internal.Util.UTF_8;
3327
import static zipkin.internal.Util.propagateIfFatal;
34-
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
3528

3629
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
3730

@@ -50,20 +43,15 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
5043
}
5144
try {
5245
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
53-
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
54-
if (!indexToServiceSpans.isEmpty()) {
55-
indexNames(indexer, indexToServiceSpans);
56-
}
46+
indexSpans(indexer, spans);
5747
indexer.execute(callback);
5848
} catch (Throwable t) {
5949
propagateIfFatal(t);
6050
callback.onError(t);
6151
}
6252
}
6353

64-
/** Indexes spans and returns a mapping of indexes that may need a names update */
65-
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
66-
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
54+
void indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
6755
for (Span span : spans) {
6856
Long timestamp = guessTimestamp(span);
6957
Long timestampMillis;
@@ -83,40 +71,11 @@ Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> sp
8371
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
8472
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
8573
}
86-
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
87-
byte[] document = Codec.JSON.writeSpan(span);
88-
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
89-
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
90-
}
91-
return indexToServiceSpans;
92-
}
93-
94-
void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
95-
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
96-
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
97-
for (String serviceName : s.serviceNames()) {
98-
serviceSpans.add(Pair.create(serviceName, s.name));
99-
}
100-
}
101-
102-
/**
103-
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104-
* a large order of duplicates ending up in the daily index. This also means queries do not need
105-
* to deduplicate.
106-
*/
107-
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
108-
throws IOException {
109-
Buffer buffer = new Buffer();
110-
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
111-
String index = entry.getKey();
112-
for (Pair<String> serviceSpan : entry.getValue()) {
113-
JsonWriter writer = JsonWriter.of(buffer);
114-
writer.beginObject();
115-
writer.name("serviceName").value(serviceSpan._1);
116-
writer.name("spanName").value(serviceSpan._2);
117-
writer.endObject();
118-
byte[] document = buffer.readByteArray();
119-
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
74+
for (Span2 span2 : Span2Converter.fromSpan(span)) {
75+
byte[] document = Span2Codec.JSON.writeSpan(span2);
76+
if (timestampMillis != null)
77+
document = prefixWithTimestampMillis(document, timestampMillis);
78+
indexer.add(index, ElasticsearchHttpSpanStore.SPAN2, document, null /* Allow ES to choose an ID */);
12079
}
12180
}
12281
}

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java

Lines changed: 26 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import java.util.Collections;
1717
import java.util.Iterator;
18-
import java.util.LinkedHashMap;
1918
import java.util.List;
2019
import java.util.Locale;
2120
import java.util.Map;
@@ -38,9 +37,8 @@
3837

3938
final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
4039

41-
static final String SPAN = "span";
4240
static final String DEPENDENCY_LINK = "dependencylink";
43-
static final String SERVICE_SPAN = "servicespan";
41+
static final String SPAN2 = "span2";
4442

4543
final SearchCallFactory search;
4644
final String[] allIndices;
@@ -63,39 +61,21 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
6361
SearchRequest.Filters filters = new SearchRequest.Filters();
6462
filters.addRange("timestamp_millis", beginMillis, endMillis);
6563
if (request.serviceName != null) {
66-
filters.addNestedTerms(asList(
67-
"annotations.endpoint.serviceName",
68-
"binaryAnnotations.endpoint.serviceName"
69-
), request.serviceName);
64+
filters.addTerm("localEndpoint.serviceName", request.serviceName);
7065
}
7166

7267
if (request.spanName != null) {
7368
filters.addTerm("name", request.spanName);
7469
}
7570

7671
for (String annotation : request.annotations) {
77-
Map<String, String> annotationValues = new LinkedHashMap<>();
78-
annotationValues.put("annotations.value", annotation);
79-
Map<String, String> binaryAnnotationKeys = new LinkedHashMap<>();
80-
binaryAnnotationKeys.put("binaryAnnotations.key", annotation);
81-
if (request.serviceName != null) {
82-
annotationValues.put("annotations.endpoint.serviceName", request.serviceName);
83-
binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
84-
}
85-
filters.addNestedTerms(annotationValues, binaryAnnotationKeys);
72+
filters.should()
73+
.addTerm("annotations.value", annotation)
74+
.addExists("tags." + annotation);
8675
}
8776

8877
for (Map.Entry<String, String> kv : request.binaryAnnotations.entrySet()) {
89-
// In our index template, we make sure the binaryAnnotation value is indexed as string,
90-
// meaning non-string values won't even be indexed at all. This means that we can only
91-
// match string values here, which happens to be exactly what we want.
92-
Map<String, String> nestedTerms = new LinkedHashMap<>();
93-
nestedTerms.put("binaryAnnotations.key", kv.getKey());
94-
nestedTerms.put("binaryAnnotations.value", kv.getValue());
95-
if (request.serviceName != null) {
96-
nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
97-
}
98-
filters.addNestedTerms(nestedTerms);
78+
filters.addTerm("tags." + kv.getKey(), kv.getValue());
9979
}
10080

10181
if (request.minDuration != null) {
@@ -114,7 +94,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
11494
.orderBy("timestamp_millis", "desc");
11595

11696
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
117-
SearchRequest esRequest = SearchRequest.forIndicesAndType(indices, SPAN)
97+
SearchRequest esRequest = SearchRequest.forIndicesAndType(indices, SPAN2)
11898
.filters(filters).addAggregation(traceIdTimestamp);
11999

120100
HttpCall<List<String>> traceIdsCall = search.newCall(esRequest, BodyConverters.SORTED_KEYS);
@@ -146,7 +126,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
146126
callback.onSuccess(Collections.emptyList());
147127
return;
148128
}
149-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
129+
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN2)
150130
.terms("traceId", traceIds);
151131
search.newCall(request, BodyConverters.SPANS).submit(successCallback);
152132
}
@@ -182,7 +162,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
182162
public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>> callback) {
183163
String traceIdHex = Util.toLowerHex(strictTraceId ? traceIdHigh : 0L, traceIdLow);
184164

185-
SearchRequest request = SearchRequest.forIndicesAndType(asList(allIndices), SPAN)
165+
SearchRequest request = SearchRequest.forIndicesAndType(asList(allIndices), SPAN2)
186166
.term("traceId", traceIdHex);
187167

188168
search.newCall(request, BodyConverters.NULLABLE_SPANS).submit(callback);
@@ -193,28 +173,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
193173
long beginMillis = endMillis - namesLookback;
194174

195175
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
196-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
197-
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));
198-
199-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
200-
@Override public void onSuccess(List<String> value) {
201-
if (!value.isEmpty()) callback.onSuccess(value);
202-
203-
// Special cased code until sites update their collectors. What this does is do a more
204-
// expensive nested query to get service names when the servicespan type returns nothing.
205-
SearchRequest.Filters filters = new SearchRequest.Filters();
206-
filters.addRange("timestamp_millis", beginMillis, endMillis);
207-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
208-
.filters(filters)
209-
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
210-
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
211-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
212-
}
213-
214-
@Override public void onError(Throwable t) {
215-
callback.onError(t);
216-
}
217-
});
176+
// Service name queries include both local and remote endpoints. This is different than
177+
// Span name, as a span name can only be on a local endpoint.
178+
SearchRequest.Filters filters = new SearchRequest.Filters();
179+
filters.addRange("timestamp_millis", beginMillis, endMillis);
180+
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN2)
181+
.filters(filters)
182+
.addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
183+
.addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
184+
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
218185
}
219186

220187
@Override public void getSpanNames(String serviceName, Callback<List<String>> callback) {
@@ -228,32 +195,15 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
228195

229196
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
230197

231-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
232-
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
233-
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));
234-
235-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
236-
@Override public void onSuccess(List<String> value) {
237-
if (!value.isEmpty()) callback.onSuccess(value);
238-
239-
// Special cased code until sites update their collectors. What this does is do a more
240-
// expensive nested query to get span names when the servicespan type returns nothing.
241-
SearchRequest.Filters filters = new SearchRequest.Filters();
242-
filters.addRange("timestamp_millis", beginMillis, endMillis);
243-
filters.addNestedTerms(asList(
244-
"annotations.endpoint.serviceName",
245-
"binaryAnnotations.endpoint.serviceName"
246-
), serviceName.toLowerCase(Locale.ROOT));
247-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
248-
.filters(filters)
249-
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
250-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
251-
}
198+
// A span name is only valid on a local endpoint, as a span name is defined locally
199+
SearchRequest.Filters filters = new SearchRequest.Filters()
200+
.addRange("timestamp_millis", beginMillis, endMillis)
201+
.addTerm("localEndpoint.serviceName", serviceName.toLowerCase(Locale.ROOT));
252202

253-
@Override public void onError(Throwable t) {
254-
callback.onError(t);
255-
}
256-
});
203+
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN2)
204+
.filters(filters)
205+
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
206+
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
257207
}
258208

259209
@Override public void getDependencies(long endTs, @Nullable Long lookback,

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/JsonAdapters.java

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import zipkin.DependencyLink;
2626
import zipkin.Endpoint;
2727
import zipkin.Span;
28-
import zipkin.internal.Util;
28+
import zipkin.internal.Span2;
29+
import zipkin.internal.Span2Converter;
2930

3031
import static zipkin.internal.Util.UTF_8;
31-
import static zipkin.internal.Util.lowerHexToUnsignedLong;
3232

3333
/**
3434
* Read-only json adapters resurrected from before we switched to Java 6 as storage components can
@@ -38,7 +38,7 @@ final class JsonAdapters {
3838
static final JsonAdapter<Span> SPAN_ADAPTER = new JsonAdapter<Span>() {
3939
@Override
4040
public Span fromJson(JsonReader reader) throws IOException {
41-
Span.Builder result = Span.builder();
41+
Span2.Builder result = Span2.builder();
4242
reader.beginObject();
4343
while (reader.hasNext()) {
4444
String nextName = reader.nextName();
@@ -48,50 +48,59 @@ public Span fromJson(JsonReader reader) throws IOException {
4848
}
4949
switch (nextName) {
5050
case "traceId":
51-
String traceId = reader.nextString();
52-
if (traceId.length() == 32) {
53-
result.traceIdHigh(lowerHexToUnsignedLong(traceId, 0));
54-
}
55-
result.traceId(lowerHexToUnsignedLong(traceId));
51+
result.traceId(reader.nextString());
5652
break;
57-
case "name":
58-
result.name(reader.nextString());
53+
case "parentId":
54+
result.parentId(reader.nextString());
5955
break;
6056
case "id":
61-
result.id(Util.lowerHexToUnsignedLong(reader.nextString()));
57+
result.id(reader.nextString());
6258
break;
63-
case "parentId":
64-
result.parentId(Util.lowerHexToUnsignedLong(reader.nextString()));
59+
case "kind":
60+
result.kind(Span2.Kind.valueOf(reader.nextString()));
61+
break;
62+
case "name":
63+
result.name(reader.nextString());
6564
break;
6665
case "timestamp":
6766
result.timestamp(reader.nextLong());
6867
break;
6968
case "duration":
7069
result.duration(reader.nextLong());
7170
break;
71+
case "localEndpoint":
72+
result.localEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
73+
break;
74+
case "remoteEndpoint":
75+
result.remoteEndpoint(ENDPOINT_ADAPTER.fromJson(reader));
76+
break;
7277
case "annotations":
7378
reader.beginArray();
7479
while (reader.hasNext()) {
75-
result.addAnnotation(ANNOTATION_ADAPTER.fromJson(reader));
80+
Annotation a = ANNOTATION_ADAPTER.fromJson(reader);
81+
result.addAnnotation(a.timestamp, a.value);
7682
}
7783
reader.endArray();
7884
break;
79-
case "binaryAnnotations":
80-
reader.beginArray();
85+
case "tags":
86+
reader.beginObject();
8187
while (reader.hasNext()) {
82-
result.addBinaryAnnotation(BINARY_ANNOTATION_ADAPTER.fromJson(reader));
88+
result.putTag(reader.nextName(), reader.nextString());
8389
}
84-
reader.endArray();
90+
reader.endObject();
8591
break;
8692
case "debug":
8793
result.debug(reader.nextBoolean());
8894
break;
95+
case "shared":
96+
result.shared(reader.nextBoolean());
97+
break;
8998
default:
9099
reader.skipValue();
91100
}
92101
}
93102
reader.endObject();
94-
return result.build();
103+
return Span2Converter.toSpan(result.build());
95104
}
96105

97106
@Override

0 commit comments

Comments
 (0)