Skip to content

Adds Elasticsearch 6.x support using Span2 model #1674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

public class ZipkinElasticsearchHttpStorageAutoConfigurationTest {

Expand Down Expand Up @@ -246,8 +247,8 @@ public void dailyIndexFormat() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin-1970-01-01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin:span-1970-01-01");
}

@Test
Expand All @@ -262,8 +263,8 @@ public void dailyIndexFormat_overridingPrefix() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin_prod-1970-01-01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin_prod:span-1970-01-01");
}

@Test
Expand All @@ -278,8 +279,8 @@ public void dailyIndexFormat_overridingDateSeparator() {
ZipkinElasticsearchHttpStorageAutoConfiguration.class);
context.refresh();

assertThat(es().indexNameFormatter().indexNameForTimestamp(0))
.isEqualTo("zipkin-1970.01.01");
assertThat(es().indexNameFormatter().formatTypeAndTimestamp(SPAN, 0))
.isEqualTo("zipkin:span-1970.01.01");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class CassandraTest {

@ClassRule
public static LazyCassandra3Storage storage =
new LazyCassandra3Storage("cassandra:3.10", "test_zipkin3");
new LazyCassandra3Storage("openzipkin/zipkin-cassandra:1.29.1", "test_zipkin3");

public static class DependenciesTest extends CassandraDependenciesTest {
@Override protected Cassandra3Storage storage() {
Expand Down
3 changes: 2 additions & 1 deletion zipkin-storage/elasticsearch-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ That said, all integration tests run on pull request via Travis.

### Debugging tests
To see each http message sent to elasticsearch during testing, export the
environment variable `ES_DEBUG=true`.
environment variable `ES_DEBUG=true`. This will also show output from the
docker container.

Note: this will produce a lot of output!

Expand Down
7 changes: 7 additions & 0 deletions zipkin-storage/elasticsearch-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,12 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,21 @@
*/
package zipkin.storage.elasticsearch.http;

import com.squareup.moshi.JsonWriter;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import okio.Buffer;
import zipkin.Codec;
import zipkin.Span;
import zipkin.internal.Pair;
import zipkin.internal.Nullable;
import zipkin.internal.Span2;
import zipkin.internal.Span2Codec;
import zipkin.internal.Span2Converter;
import zipkin.storage.AsyncSpanConsumer;
import zipkin.storage.Callback;

import static zipkin.internal.ApplyTimestampAndDuration.guessTimestamp;
import static zipkin.internal.Util.UTF_8;
import static zipkin.internal.Util.propagateIfFatal;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SERVICE_SPAN;
import static zipkin.storage.elasticsearch.http.ElasticsearchHttpSpanStore.SPAN;

class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing

Expand All @@ -49,76 +45,64 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
return;
}
try {
HttpBulkIndexer indexer = new HttpBulkIndexer("index-span", es);
Map<String, Set<Pair<String>>> indexToServiceSpans = indexSpans(indexer, spans);
if (!indexToServiceSpans.isEmpty()) {
indexNames(indexer, indexToServiceSpans);
}
BulkSpanIndexer indexer = newBulkSpanIndexer(es);
indexSpans(indexer, spans);
indexer.execute(callback);
} catch (Throwable t) {
propagateIfFatal(t);
callback.onError(t);
}
}

/** Indexes spans and returns a mapping of indexes that may need a names update */
Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> spans) {
Map<String, Set<Pair<String>>> indexToServiceSpans = new LinkedHashMap<>();
void indexSpans(BulkSpanIndexer indexer, List<Span> spans) throws IOException {
for (Span span : spans) {
Long timestamp = guessTimestamp(span);
Long timestampMillis;
String index; // which index to store this span into
long indexTimestamp = 0L; // which index to store this span into
Long spanTimestamp;
if (timestamp != null) {
timestampMillis = TimeUnit.MICROSECONDS.toMillis(timestamp);
index = indexNameFormatter.indexNameForTimestamp(timestampMillis);
indexTimestamp = spanTimestamp = TimeUnit.MICROSECONDS.toMillis(timestamp);
} else {
timestampMillis = null;
spanTimestamp = null;
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
// the index bucket, any annotation is better than using current time.
Long indexTimestamp = null;
for (int i = 0, length = span.annotations.size(); i < length; i++) {
indexTimestamp = span.annotations.get(i).timestamp / 1000;
break;
}
if (indexTimestamp == null) indexTimestamp = System.currentTimeMillis();
index = indexNameFormatter.indexNameForTimestamp(indexTimestamp);
if (indexTimestamp == 0L) indexTimestamp = System.currentTimeMillis();
}
if (!span.name.isEmpty()) putServiceSpans(indexToServiceSpans, index, span);
byte[] document = Codec.JSON.writeSpan(span);
if (timestampMillis != null) document = prefixWithTimestampMillis(document, timestampMillis);
indexer.add(index, ElasticsearchHttpSpanStore.SPAN, document, null /* Allow ES to choose an ID */);
indexer.add(indexTimestamp, span, spanTimestamp);
}
return indexToServiceSpans;
}

void putServiceSpans(Map<String, Set<Pair<String>>> indexToServiceSpans, String index, Span s) {
Set<Pair<String>> serviceSpans = indexToServiceSpans.get(index);
if (serviceSpans == null) indexToServiceSpans.put(index, serviceSpans = new LinkedHashSet<>());
for (String serviceName : s.serviceNames()) {
serviceSpans.add(Pair.create(serviceName, s.name));
}

BulkSpanIndexer newBulkSpanIndexer(ElasticsearchHttpStorage es) {
return new BulkSpanIndexer(es);
}

/**
* Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
* a large order of duplicates ending up in the daily index. This also means queries do not need
* to deduplicate.
*/
void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToServiceSpans)
throws IOException {
Buffer buffer = new Buffer();
for (Map.Entry<String, Set<Pair<String>>> entry : indexToServiceSpans.entrySet()) {
String index = entry.getKey();
for (Pair<String> serviceSpan : entry.getValue()) {
JsonWriter writer = JsonWriter.of(buffer);
writer.beginObject();
writer.name("serviceName").value(serviceSpan._1);
writer.name("spanName").value(serviceSpan._2);
writer.endObject();
byte[] document = buffer.readByteArray();
indexer.add(index, SERVICE_SPAN, document, serviceSpan._1 + "|" + serviceSpan._2);
static class BulkSpanIndexer {
final HttpBulkIndexer indexer;
final IndexNameFormatter indexNameFormatter;

BulkSpanIndexer(ElasticsearchHttpStorage es) {
this.indexer = new HttpBulkIndexer("index-span", es);
this.indexNameFormatter = es.indexNameFormatter();
}

void add(long indexTimestamp, Span span, @Nullable Long timestampMillis) {
String index = indexNameFormatter.formatTypeAndTimestamp(SPAN, indexTimestamp);
for (Span2 span2 : Span2Converter.fromSpan(span)) {
byte[] document = Span2Codec.JSON.writeSpan(span2);
if (timestampMillis != null) {
document = prefixWithTimestampMillis(document, timestampMillis);
}
indexer.add(index, SPAN, document, null /* Allow ES to choose an ID */);
}
}

void execute(Callback<Void> callback) throws IOException {
indexer.execute(callback);
}
}

private static final byte[] TIMESTAMP_MILLIS_PREFIX = "{\"timestamp_millis\":".getBytes(UTF_8);
Expand Down
Loading