13
13
*/
14
14
package zipkin .storage .elasticsearch .http ;
15
15
16
- import com .squareup .moshi .JsonWriter ;
17
16
import java .io .IOException ;
18
- import java .util .LinkedHashMap ;
19
- import java .util .LinkedHashSet ;
20
17
import java .util .List ;
21
- import java .util .Map ;
22
- import java .util .Set ;
23
18
import java .util .concurrent .TimeUnit ;
24
- import okio .Buffer ;
25
- import zipkin .Codec ;
26
19
import zipkin .Span ;
27
- import zipkin .internal .Pair ;
20
+ import zipkin .internal .Nullable ;
21
+ import zipkin .internal .Span2 ;
22
+ import zipkin .internal .Span2Codec ;
23
+ import zipkin .internal .Span2Converter ;
28
24
import zipkin .storage .AsyncSpanConsumer ;
29
25
import zipkin .storage .Callback ;
30
26
31
27
import static zipkin .internal .ApplyTimestampAndDuration .guessTimestamp ;
32
28
import static zipkin .internal .Util .UTF_8 ;
33
29
import static zipkin .internal .Util .propagateIfFatal ;
34
- import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SERVICE_SPAN ;
30
+ import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SPAN ;
35
31
36
32
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
37
33
38
34
final ElasticsearchHttpStorage es ;
39
35
final IndexNameFormatter indexNameFormatter ;
40
36
41
37
ElasticsearchHttpSpanConsumer (ElasticsearchHttpStorage es ) {
38
+ this (es , es .indexNameFormatter ());
39
+ }
40
+
41
+ ElasticsearchHttpSpanConsumer (ElasticsearchHttpStorage es , IndexNameFormatter indexNameFormatter ) {
42
42
this .es = es ;
43
- this .indexNameFormatter = es . indexNameFormatter () ;
43
+ this .indexNameFormatter = indexNameFormatter ;
44
44
}
45
45
46
46
@ Override public void accept (List <Span > spans , Callback <Void > callback ) {
@@ -49,76 +49,64 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
49
49
return ;
50
50
}
51
51
try {
52
- HttpBulkIndexer indexer = new HttpBulkIndexer ("index-span" , es );
53
- Map <String , Set <Pair <String >>> indexToServiceSpans = indexSpans (indexer , spans );
54
- if (!indexToServiceSpans .isEmpty ()) {
55
- indexNames (indexer , indexToServiceSpans );
56
- }
52
+ BulkSpanIndexer indexer = newBulkSpanIndexer (es );
53
+ indexSpans (indexer , spans );
57
54
indexer .execute (callback );
58
55
} catch (Throwable t ) {
59
56
propagateIfFatal (t );
60
57
callback .onError (t );
61
58
}
62
59
}
63
60
64
- /** Indexes spans and returns a mapping of indexes that may need a names update */
65
- Map <String , Set <Pair <String >>> indexSpans (HttpBulkIndexer indexer , List <Span > spans ) {
66
- Map <String , Set <Pair <String >>> indexToServiceSpans = new LinkedHashMap <>();
61
+ void indexSpans (BulkSpanIndexer indexer , List <Span > spans ) throws IOException {
67
62
for (Span span : spans ) {
68
63
Long timestamp = guessTimestamp (span );
69
- Long timestampMillis ;
70
- String index ; // which index to store this span into
64
+ long indexTimestamp = 0L ; // which index to store this span into
65
+ Long spanTimestamp ;
71
66
if (timestamp != null ) {
72
- timestampMillis = TimeUnit .MICROSECONDS .toMillis (timestamp );
73
- index = indexNameFormatter .indexNameForTimestamp (timestampMillis );
67
+ indexTimestamp = spanTimestamp = TimeUnit .MICROSECONDS .toMillis (timestamp );
74
68
} else {
75
- timestampMillis = null ;
69
+ spanTimestamp = null ;
76
70
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
77
71
// the index bucket, any annotation is better than using current time.
78
- Long indexTimestamp = null ;
79
72
for (int i = 0 , length = span .annotations .size (); i < length ; i ++) {
80
73
indexTimestamp = span .annotations .get (i ).timestamp / 1000 ;
81
74
break ;
82
75
}
83
- if (indexTimestamp == null ) indexTimestamp = System .currentTimeMillis ();
84
- index = indexNameFormatter .indexNameForTimestamp (indexTimestamp );
76
+ if (indexTimestamp == 0L ) indexTimestamp = System .currentTimeMillis ();
85
77
}
86
- if (!span .name .isEmpty ()) putServiceSpans (indexToServiceSpans , index , span );
87
- byte [] document = Codec .JSON .writeSpan (span );
88
- if (timestampMillis != null ) document = prefixWithTimestampMillis (document , timestampMillis );
89
- indexer .add (index , ElasticsearchHttpSpanStore .SPAN , document , null /* Allow ES to choose an ID */ );
78
+ indexer .add (indexTimestamp , span , spanTimestamp );
90
79
}
91
- return indexToServiceSpans ;
92
80
}
93
81
94
- void putServiceSpans (Map <String , Set <Pair <String >>> indexToServiceSpans , String index , Span s ) {
95
- Set <Pair <String >> serviceSpans = indexToServiceSpans .get (index );
96
- if (serviceSpans == null ) indexToServiceSpans .put (index , serviceSpans = new LinkedHashSet <>());
97
- for (String serviceName : s .serviceNames ()) {
98
- serviceSpans .add (Pair .create (serviceName , s .name ));
99
- }
82
+
83
+ BulkSpanIndexer newBulkSpanIndexer (ElasticsearchHttpStorage es ) {
84
+ return new BulkSpanIndexer (es );
100
85
}
101
86
102
- /**
103
- * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104
- * a large order of duplicates ending up in the daily index. This also means queries do not need
105
- * to deduplicate.
106
- */
107
- void indexNames (HttpBulkIndexer indexer , Map <String , Set <Pair <String >>> indexToServiceSpans )
108
- throws IOException {
109
- Buffer buffer = new Buffer ();
110
- for (Map .Entry <String , Set <Pair <String >>> entry : indexToServiceSpans .entrySet ()) {
111
- String index = entry .getKey ();
112
- for (Pair <String > serviceSpan : entry .getValue ()) {
113
- JsonWriter writer = JsonWriter .of (buffer );
114
- writer .beginObject ();
115
- writer .name ("serviceName" ).value (serviceSpan ._1 );
116
- writer .name ("spanName" ).value (serviceSpan ._2 );
117
- writer .endObject ();
118
- byte [] document = buffer .readByteArray ();
119
- indexer .add (index , SERVICE_SPAN , document , serviceSpan ._1 + "|" + serviceSpan ._2 );
87
+ static class BulkSpanIndexer {
88
+ final HttpBulkIndexer indexer ;
89
+ final IndexNameFormatter indexNameFormatter ;
90
+
91
+ BulkSpanIndexer (ElasticsearchHttpStorage es ) {
92
+ this .indexer = new HttpBulkIndexer ("index-span" , es );
93
+ this .indexNameFormatter = es .indexNameFormatter ();
94
+ }
95
+
96
+ void add (long indexTimestamp , Span span , @ Nullable Long timestampMillis ) {
97
+ String index = indexNameFormatter .formatTypeAndTimestamp (SPAN , indexTimestamp );
98
+ for (Span2 span2 : Span2Converter .fromSpan (span )) {
99
+ byte [] document = Span2Codec .JSON .writeSpan (span2 );
100
+ if (timestampMillis != null ) {
101
+ document = prefixWithTimestampMillis (document , timestampMillis );
102
+ }
103
+ indexer .add (index , SPAN , document , null /* Allow ES to choose an ID */ );
120
104
}
121
105
}
106
+
107
+ void execute (Callback <Void > callback ) throws IOException {
108
+ indexer .execute (callback );
109
+ }
122
110
}
123
111
124
112
private static final byte [] TIMESTAMP_MILLIS_PREFIX = "{\" timestamp_millis\" :" .getBytes (UTF_8 );
@@ -131,7 +119,7 @@ void indexNames(HttpBulkIndexer indexer, Map<String, Set<Pair<String>>> indexToS
131
119
static byte [] prefixWithTimestampMillis (byte [] input , long timestampMillis ) {
132
120
String dateAsString = Long .toString (timestampMillis );
133
121
byte [] newSpanBytes =
134
- new byte [TIMESTAMP_MILLIS_PREFIX .length + dateAsString .length () + input .length ];
122
+ new byte [TIMESTAMP_MILLIS_PREFIX .length + dateAsString .length () + input .length ];
135
123
int pos = 0 ;
136
124
System .arraycopy (TIMESTAMP_MILLIS_PREFIX , 0 , newSpanBytes , pos , TIMESTAMP_MILLIS_PREFIX .length );
137
125
pos += TIMESTAMP_MILLIS_PREFIX .length ;
0 commit comments