13
13
*/
14
14
package zipkin .storage .elasticsearch .http ;
15
15
16
- import com .squareup .moshi .JsonWriter ;
17
16
import java .io .IOException ;
18
- import java .util .LinkedHashMap ;
19
- import java .util .LinkedHashSet ;
20
17
import java .util .List ;
21
- import java .util .Map ;
22
- import java .util .Set ;
23
18
import java .util .concurrent .TimeUnit ;
24
- import okio .Buffer ;
25
- import zipkin .Codec ;
26
19
import zipkin .Span ;
27
- import zipkin .internal .Pair ;
20
+ import zipkin .internal .Nullable ;
21
+ import zipkin .internal .Span2 ;
22
+ import zipkin .internal .Span2Codec ;
23
+ import zipkin .internal .Span2Converter ;
28
24
import zipkin .storage .AsyncSpanConsumer ;
29
25
import zipkin .storage .Callback ;
30
26
31
27
import static zipkin .internal .ApplyTimestampAndDuration .guessTimestamp ;
32
28
import static zipkin .internal .Util .UTF_8 ;
33
29
import static zipkin .internal .Util .propagateIfFatal ;
34
- import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SERVICE_SPAN ;
30
+ import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SPAN ;
35
31
36
32
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
37
33
@@ -49,76 +45,64 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
49
45
return ;
50
46
}
51
47
try {
52
- HttpBulkIndexer indexer = new HttpBulkIndexer ("index-span" , es );
53
- Map <String , Set <Pair <String >>> indexToServiceSpans = indexSpans (indexer , spans );
54
- if (!indexToServiceSpans .isEmpty ()) {
55
- indexNames (indexer , indexToServiceSpans );
56
- }
48
+ BulkSpanIndexer indexer = newBulkSpanIndexer (es );
49
+ indexSpans (indexer , spans );
57
50
indexer .execute (callback );
58
51
} catch (Throwable t ) {
59
52
propagateIfFatal (t );
60
53
callback .onError (t );
61
54
}
62
55
}
63
56
64
- /** Indexes spans and returns a mapping of indexes that may need a names update */
65
- Map <String , Set <Pair <String >>> indexSpans (HttpBulkIndexer indexer , List <Span > spans ) {
66
- Map <String , Set <Pair <String >>> indexToServiceSpans = new LinkedHashMap <>();
57
+ void indexSpans (BulkSpanIndexer indexer , List <Span > spans ) throws IOException {
67
58
for (Span span : spans ) {
68
59
Long timestamp = guessTimestamp (span );
69
- Long timestampMillis ;
70
- String index ; // which index to store this span into
60
+ long indexTimestamp = 0L ; // which index to store this span into
61
+ Long spanTimestamp ;
71
62
if (timestamp != null ) {
72
- timestampMillis = TimeUnit .MICROSECONDS .toMillis (timestamp );
73
- index = indexNameFormatter .indexNameForTimestamp (timestampMillis );
63
+ indexTimestamp = spanTimestamp = TimeUnit .MICROSECONDS .toMillis (timestamp );
74
64
} else {
75
- timestampMillis = null ;
65
+ spanTimestamp = null ;
76
66
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
77
67
// the index bucket, any annotation is better than using current time.
78
- Long indexTimestamp = null ;
79
68
for (int i = 0 , length = span .annotations .size (); i < length ; i ++) {
80
69
indexTimestamp = span .annotations .get (i ).timestamp / 1000 ;
81
70
break ;
82
71
}
83
- if (indexTimestamp == null ) indexTimestamp = System .currentTimeMillis ();
84
- index = indexNameFormatter .indexNameForTimestamp (indexTimestamp );
72
+ if (indexTimestamp == 0L ) indexTimestamp = System .currentTimeMillis ();
85
73
}
86
- if (!span .name .isEmpty ()) putServiceSpans (indexToServiceSpans , index , span );
87
- byte [] document = Codec .JSON .writeSpan (span );
88
- if (timestampMillis != null ) document = prefixWithTimestampMillis (document , timestampMillis );
89
- indexer .add (index , ElasticsearchHttpSpanStore .SPAN , document , null /* Allow ES to choose an ID */ );
74
+ indexer .add (indexTimestamp , span , spanTimestamp );
90
75
}
91
- return indexToServiceSpans ;
92
76
}
93
77
94
- void putServiceSpans (Map <String , Set <Pair <String >>> indexToServiceSpans , String index , Span s ) {
95
- Set <Pair <String >> serviceSpans = indexToServiceSpans .get (index );
96
- if (serviceSpans == null ) indexToServiceSpans .put (index , serviceSpans = new LinkedHashSet <>());
97
- for (String serviceName : s .serviceNames ()) {
98
- serviceSpans .add (Pair .create (serviceName , s .name ));
99
- }
78
+
79
+ BulkSpanIndexer newBulkSpanIndexer (ElasticsearchHttpStorage es ) {
80
+ return new BulkSpanIndexer (es );
100
81
}
101
82
102
- /**
103
- * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104
- * a large order of duplicates ending up in the daily index. This also means queries do not need
105
- * to deduplicate.
106
- */
107
- void indexNames (HttpBulkIndexer indexer , Map <String , Set <Pair <String >>> indexToServiceSpans )
108
- throws IOException {
109
- Buffer buffer = new Buffer ();
110
- for (Map .Entry <String , Set <Pair <String >>> entry : indexToServiceSpans .entrySet ()) {
111
- String index = entry .getKey ();
112
- for (Pair <String > serviceSpan : entry .getValue ()) {
113
- JsonWriter writer = JsonWriter .of (buffer );
114
- writer .beginObject ();
115
- writer .name ("serviceName" ).value (serviceSpan ._1 );
116
- writer .name ("spanName" ).value (serviceSpan ._2 );
117
- writer .endObject ();
118
- byte [] document = buffer .readByteArray ();
119
- indexer .add (index , SERVICE_SPAN , document , serviceSpan ._1 + "|" + serviceSpan ._2 );
83
+ static class BulkSpanIndexer {
84
+ final HttpBulkIndexer indexer ;
85
+ final IndexNameFormatter indexNameFormatter ;
86
+
87
+ BulkSpanIndexer (ElasticsearchHttpStorage es ) {
88
+ this .indexer = new HttpBulkIndexer ("index-span" , es );
89
+ this .indexNameFormatter = es .indexNameFormatter ();
90
+ }
91
+
92
+ void add (long indexTimestamp , Span span , @ Nullable Long timestampMillis ) {
93
+ String index = indexNameFormatter .formatTypeAndTimestamp (SPAN , indexTimestamp );
94
+ for (Span2 span2 : Span2Converter .fromSpan (span )) {
95
+ byte [] document = Span2Codec .JSON .writeSpan (span2 );
96
+ if (timestampMillis != null ) {
97
+ document = prefixWithTimestampMillis (document , timestampMillis );
98
+ }
99
+ indexer .add (index , SPAN , document , null /* Allow ES to choose an ID */ );
120
100
}
121
101
}
102
+
103
+ void execute (Callback <Void > callback ) throws IOException {
104
+ indexer .execute (callback );
105
+ }
122
106
}
123
107
124
108
private static final byte [] TIMESTAMP_MILLIS_PREFIX = "{\" timestamp_millis\" :" .getBytes (UTF_8 );
0 commit comments