Skip to content

Commit 4e1c55a

Browse files
author
Adrian Cole
committed
Spikes ES support
1 parent afb2872 commit 4e1c55a

24 files changed

+462
-758
lines changed

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanConsumer.java

Lines changed: 14 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,19 @@
1313
*/
1414
package zipkin.storage.elasticsearch.http;
1515

16-
import com.squareup.moshi.JsonWriter;
17-
import java.io.IOException;
18-
import java.util.LinkedHashMap;
19-
import java.util.LinkedHashSet;
2016
import java.util.List;
21-
import java.util.Map;
22-
import java.util.Set;
2317
import java.util.concurrent.TimeUnit;
24-
import okio.Buffer;
25-
import zipkin.Codec;
2618
import zipkin.Span;
27-
import zipkin.internal.Pair;
19+
import zipkin.internal.Span2;
20+
import zipkin.internal.Span2Codec;
21+
import zipkin.internal.Span2Converter;
2822
import zipkin.storage.AsyncSpanConsumer;
2923
import zipkin.storage.Callback;
3024

3125
import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
3226
import static zipkin.internal.Util.UTF_8;
3327
import static zipkin.internal.Util.propagateIfFatal;
34-
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
28+
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;
3529

3630
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
3731

@@ -50,27 +44,22 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
5044
}
5145
try {
5246
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
53-
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
54-
if (!indexToServiceSpans.isEmpty()) {
55-
indexNames(indexer, indexToServiceSpans);
56-
}
47+
indexSpans(indexer, spans);
5748
indexer.execute(callback);
5849
} catch (Throwable t) {
5950
propagateIfFatal(t);
6051
callback.onError(t);
6152
}
6253
}
6354

64-
/** Indexes spans and returns a mapping of indexes that may need a names update */
65-
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
66-
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
55+
void indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
6756
for (Span span : spans) {
6857
Long timestamp = guessTimestamp(span);
6958
Long timestampMillis;
7059
String index; // which index to store this span into
7160
if (timestamp != null) {
7261
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
73-
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
62+
index = indexNameFormatter.indexNameForTimestamp(SPAN, timestampMillis);
7463
} else {
7564
timestampMillis = null;
7665
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
@@ -81,42 +70,14 @@ Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> sp
8170
break;
8271
}
8372
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
84-
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
73+
index = indexNameFormatter.indexNameForTimestamp(SPAN, indexTimestamp);
8574
}
86-
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
87-
byte[] document = Codec.JSON.writeSpan(span);
88-
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
89-
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
90-
}
91-
return indexToServiceSpans;
92-
}
93-
94-
void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
95-
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
96-
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
97-
for (String serviceName : s.serviceNames()) {
98-
serviceSpans.add(Pair.create(serviceName, s.name));
99-
}
100-
}
101-
102-
/**
103-
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104-
* a large order of duplicates ending up in the daily index. This also means queries do not need
105-
* to deduplicate.
106-
*/
107-
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
108-
throws IOException {
109-
Buffer buffer = new Buffer();
110-
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
111-
String index = entry.getKey();
112-
for (Pair<String> serviceSpan : entry.getValue()) {
113-
JsonWriter writer = JsonWriter.of(buffer);
114-
writer.beginObject();
115-
writer.name("serviceName").value(serviceSpan._1);
116-
writer.name("spanName").value(serviceSpan._2);
117-
writer.endObject();
118-
byte[] document = buffer.readByteArray();
119-
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
75+
for (Span2 span2 : Span2Converter.fromSpan(span)) {
76+
byte[] document = Span2Codec.JSON.writeSpan(span2);
77+
if (timestampMillis != null) {
78+
document = prefixWithTimestampMillis(document, timestampMillis);
79+
}
80+
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
12081
}
12182
}
12283
}

zipkin-storage/elasticsearch-http/src/main/java/zipkin/storage/elasticsearch/http/ElasticsearchHttpSpanStore.java

Lines changed: 38 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import java.util.Collections;
1717
import java.util.Iterator;
18-
import java.util.LinkedHashMap;
1918
import java.util.List;
2019
import java.util.Locale;
2120
import java.util.Map;
@@ -38,19 +37,18 @@
3837

3938
final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
4039

40+
static final String DEPENDENCY = "dependency";
4141
static final String SPAN = "span";
42-
static final String DEPENDENCY_LINK = "dependencylink";
43-
static final String SERVICE_SPAN = "servicespan";
4442

4543
final SearchCallFactory search;
46-
final String[] allIndices;
44+
final String[] allSpanIndices;
4745
final IndexNameFormatter indexNameFormatter;
4846
final boolean strictTraceId;
4947
final int namesLookback;
5048

5149
ElasticsearchHttpSpanStore(ElasticsearchHttpStorage es) {
5250
this.search = new SearchCallFactory(es.http());
53-
this.allIndices = new String[] {es.indexNameFormatter().allIndices()};
51+
this.allSpanIndices = new String[] {es.indexNameFormatter().allSpanIndices()};
5452
this.indexNameFormatter = es.indexNameFormatter();
5553
this.strictTraceId = es.strictTraceId();
5654
this.namesLookback = es.namesLookback();
@@ -63,39 +61,21 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
6361
SearchRequest.Filters filters = new SearchRequest.Filters();
6462
filters.addRange("timestamp_millis", beginMillis, endMillis);
6563
if (request.serviceName != null) {
66-
filters.addNestedTerms(asList(
67-
"annotations.endpoint.serviceName",
68-
"binaryAnnotations.endpoint.serviceName"
69-
), request.serviceName);
64+
filters.addTerm("localEndpoint.serviceName", request.serviceName);
7065
}
7166

7267
if (request.spanName != null) {
7368
filters.addTerm("name", request.spanName);
7469
}
7570

7671
for (String annotation : request.annotations) {
77-
Map<String, String> annotationValues = new LinkedHashMap<>();
78-
annotationValues.put("annotations.value", annotation);
79-
Map<String, String> binaryAnnotationKeys = new LinkedHashMap<>();
80-
binaryAnnotationKeys.put("binaryAnnotations.key", annotation);
81-
if (request.serviceName != null) {
82-
annotationValues.put("annotations.endpoint.serviceName", request.serviceName);
83-
binaryAnnotationKeys.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
84-
}
85-
filters.addNestedTerms(annotationValues, binaryAnnotationKeys);
72+
filters.should()
73+
.addTerm("annotations.value", annotation)
74+
.addExists("tags." + annotation);
8675
}
8776

8877
for (Map.Entry<String, String> kv : request.binaryAnnotations.entrySet()) {
89-
// In our index template, we make sure the binaryAnnotation value is indexed as string,
90-
// meaning non-string values won't even be indexed at all. This means that we can only
91-
// match string values here, which happens to be exactly what we want.
92-
Map<String, String> nestedTerms = new LinkedHashMap<>();
93-
nestedTerms.put("binaryAnnotations.key", kv.getKey());
94-
nestedTerms.put("binaryAnnotations.value", kv.getValue());
95-
if (request.serviceName != null) {
96-
nestedTerms.put("binaryAnnotations.endpoint.serviceName", request.serviceName);
97-
}
98-
filters.addNestedTerms(nestedTerms);
78+
filters.addTerm("tags." + kv.getKey(), kv.getValue());
9979
}
10080

10181
if (request.minDuration != null) {
@@ -113,8 +93,9 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
11393
.addSubAggregation(Aggregation.min("timestamp_millis"))
11494
.orderBy("timestamp_millis", "desc");
11595

116-
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
117-
SearchRequest esRequest = SearchRequest.forIndicesAndType(indices, SPAN)
96+
List<String> indices =
97+
indexNameFormatter.indexNamePatternsForRange(SPAN, beginMillis, endMillis);
98+
SearchRequest esRequest = SearchRequest.create(indices)
11899
.filters(filters).addAggregation(traceIdTimestamp);
119100

120101
HttpCall<List<String>> traceIdsCall = search.newCall(esRequest, BodyConverters.SORTED_KEYS);
@@ -146,7 +127,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
146127
callback.onSuccess(Collections.emptyList());
147128
return;
148129
}
149-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
130+
SearchRequest request = SearchRequest.create(indices)
150131
.terms("traceId", traceIds);
151132
search.newCall(request, BodyConverters.SPANS).submit(successCallback);
152133
}
@@ -182,7 +163,7 @@ final class ElasticsearchHttpSpanStore implements AsyncSpanStore {
182163
public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>> callback) {
183164
String traceIdHex = Util.toLowerHex(strictTraceId ? traceIdHigh : 0L, traceIdLow);
184165

185-
SearchRequest request = SearchRequest.forIndicesAndType(asList(allIndices), SPAN)
166+
SearchRequest request = SearchRequest.create(asList(allSpanIndices))
186167
.term("traceId", traceIdHex);
187168

188169
search.newCall(request, BodyConverters.NULLABLE_SPANS).submit(callback);
@@ -192,29 +173,17 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
192173
long endMillis = System.currentTimeMillis();
193174
long beginMillis = endMillis - namesLookback;
194175

195-
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
196-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
197-
.addAggregation(Aggregation.terms("serviceName", Integer.MAX_VALUE));
198-
199-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
200-
@Override public void onSuccess(List<String> value) {
201-
if (!value.isEmpty()) callback.onSuccess(value);
202-
203-
// Special cased code until sites update their collectors. What this does is do a more
204-
// expensive nested query to get service names when the servicespan type returns nothing.
205-
SearchRequest.Filters filters = new SearchRequest.Filters();
206-
filters.addRange("timestamp_millis", beginMillis, endMillis);
207-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
208-
.filters(filters)
209-
.addAggregation(Aggregation.nestedTerms("annotations.endpoint.serviceName"))
210-
.addAggregation(Aggregation.nestedTerms("binaryAnnotations.endpoint.serviceName"));
211-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
212-
}
213-
214-
@Override public void onError(Throwable t) {
215-
callback.onError(t);
216-
}
217-
});
176+
List<String> indices =
177+
indexNameFormatter.indexNamePatternsForRange(SPAN, beginMillis, endMillis);
178+
// Service name queries include both local and remote endpoints. This is different than
179+
// Span name, as a span name can only be on a local endpoint.
180+
SearchRequest.Filters filters = new SearchRequest.Filters();
181+
filters.addRange("timestamp_millis", beginMillis, endMillis);
182+
SearchRequest request = SearchRequest.create(indices)
183+
.filters(filters)
184+
.addAggregation(Aggregation.terms("localEndpoint.serviceName", Integer.MAX_VALUE))
185+
.addAggregation(Aggregation.terms("remoteEndpoint.serviceName", Integer.MAX_VALUE));
186+
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
218187
}
219188

220189
@Override public void getSpanNames(String serviceName, Callback<List<String>> callback) {
@@ -226,34 +195,18 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
226195
long endMillis = System.currentTimeMillis();
227196
long beginMillis = endMillis - namesLookback;
228197

229-
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endMillis);
230-
231-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SERVICE_SPAN)
232-
.term("serviceName", serviceName.toLowerCase(Locale.ROOT))
233-
.addAggregation(Aggregation.terms("spanName", Integer.MAX_VALUE));
234-
235-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(new Callback<List<String>>() {
236-
@Override public void onSuccess(List<String> value) {
237-
if (!value.isEmpty()) callback.onSuccess(value);
238-
239-
// Special cased code until sites update their collectors. What this does is do a more
240-
// expensive nested query to get span names when the servicespan type returns nothing.
241-
SearchRequest.Filters filters = new SearchRequest.Filters();
242-
filters.addRange("timestamp_millis", beginMillis, endMillis);
243-
filters.addNestedTerms(asList(
244-
"annotations.endpoint.serviceName",
245-
"binaryAnnotations.endpoint.serviceName"
246-
), serviceName.toLowerCase(Locale.ROOT));
247-
SearchRequest request = SearchRequest.forIndicesAndType(indices, SPAN)
248-
.filters(filters)
249-
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
250-
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
251-
}
198+
List<String> indices =
199+
indexNameFormatter.indexNamePatternsForRange(SPAN, beginMillis, endMillis);
252200

253-
@Override public void onError(Throwable t) {
254-
callback.onError(t);
255-
}
256-
});
201+
// A span name is only valid on a local endpoint, as a span name is defined locally
202+
SearchRequest.Filters filters = new SearchRequest.Filters()
203+
.addRange("timestamp_millis", beginMillis, endMillis)
204+
.addTerm("localEndpoint.serviceName", serviceName.toLowerCase(Locale.ROOT));
205+
206+
SearchRequest request = SearchRequest.create(indices)
207+
.filters(filters)
208+
.addAggregation(Aggregation.terms("name", Integer.MAX_VALUE));
209+
search.newCall(request, BodyConverters.SORTED_KEYS).submit(callback);
257210
}
258211

259212
@Override public void getDependencies(long endTs, @Nullable Long lookback,
@@ -262,13 +215,8 @@ public void getRawTrace(long traceIdHigh, long traceIdLow, Callback<List<Span>>
262215
long beginMillis = lookback != null ? endTs - lookback : 0;
263216
// We just return all dependencies in the days that fall within endTs and lookback as
264217
// dependency links themselves don't have timestamps.
265-
List<String> indices = indexNameFormatter.indexNamePatternsForRange(beginMillis, endTs);
266-
getDependencies(indices, callback);
267-
}
268-
269-
void getDependencies(List<String> indices, Callback<List<DependencyLink>> callback) {
270-
SearchRequest request = SearchRequest.forIndicesAndType(indices, DEPENDENCY_LINK);
271-
272-
search.newCall(request, BodyConverters.DEPENDENCY_LINKS).submit(callback);
218+
List<String> indices =
219+
indexNameFormatter.indexNamePatternsForRange(DEPENDENCY, beginMillis, endTs);
220+
search.newCall(SearchRequest.create(indices), BodyConverters.DEPENDENCY_LINKS).submit(callback);
273221
}
274222
}

0 commit comments

Comments
 (0)