13
13
*/
14
14
package zipkin .storage .elasticsearch .http ;
15
15
16
- import com .squareup .moshi .JsonWriter ;
17
- import java .io .IOException ;
18
- import java .util .LinkedHashMap ;
19
- import java .util .LinkedHashSet ;
20
16
import java .util .List ;
21
- import java .util .Map ;
22
- import java .util .Set ;
23
17
import java .util .concurrent .TimeUnit ;
24
- import okio .Buffer ;
25
- import zipkin .Codec ;
26
18
import zipkin .Span ;
27
- import zipkin .internal .Pair ;
19
+ import zipkin .internal .Span2 ;
20
+ import zipkin .internal .Span2Codec ;
21
+ import zipkin .internal .Span2Converter ;
28
22
import zipkin .storage .AsyncSpanConsumer ;
29
23
import zipkin .storage .Callback ;
30
24
31
25
import static zipkin .internal .ApplyTimestampAndDuration .guessTimestamp ;
32
26
import static zipkin .internal .Util .UTF_8 ;
33
27
import static zipkin .internal .Util .propagateIfFatal ;
34
- import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SERVICE_SPAN ;
28
+ import static zipkin .storage .elasticsearch .http .ElasticsearchHttpSpanStore .SPAN ;
35
29
36
30
class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final for testing
37
31
@@ -50,27 +44,22 @@ class ElasticsearchHttpSpanConsumer implements AsyncSpanConsumer { // not final
50
44
}
51
45
try {
52
46
HttpBulkIndexer indexer = new HttpBulkIndexer ("index-span" , es );
53
- Map <String , Set <Pair <String >>> indexToServiceSpans = indexSpans (indexer , spans );
54
- if (!indexToServiceSpans .isEmpty ()) {
55
- indexNames (indexer , indexToServiceSpans );
56
- }
47
+ indexSpans (indexer , spans );
57
48
indexer .execute (callback );
58
49
} catch (Throwable t ) {
59
50
propagateIfFatal (t );
60
51
callback .onError (t );
61
52
}
62
53
}
63
54
64
- /** Indexes spans and returns a mapping of indexes that may need a names update */
65
- Map <String , Set <Pair <String >>> indexSpans (HttpBulkIndexer indexer , List <Span > spans ) {
66
- Map <String , Set <Pair <String >>> indexToServiceSpans = new LinkedHashMap <>();
55
+ void indexSpans (HttpBulkIndexer indexer , List <Span > spans ) {
67
56
for (Span span : spans ) {
68
57
Long timestamp = guessTimestamp (span );
69
58
Long timestampMillis ;
70
59
String index ; // which index to store this span into
71
60
if (timestamp != null ) {
72
61
timestampMillis = TimeUnit .MICROSECONDS .toMillis (timestamp );
73
- index = indexNameFormatter .indexNameForTimestamp (timestampMillis );
62
+ index = indexNameFormatter .indexNameForTimestamp (SPAN , timestampMillis );
74
63
} else {
75
64
timestampMillis = null ;
76
65
// guessTimestamp is made for determining the span's authoritative timestamp. When choosing
@@ -81,42 +70,14 @@ Map<String, Set<Pair<String>>> indexSpans(HttpBulkIndexer indexer, List<Span> sp
81
70
break ;
82
71
}
83
72
if (indexTimestamp == null ) indexTimestamp = System .currentTimeMillis ();
84
- index = indexNameFormatter .indexNameForTimestamp (indexTimestamp );
73
+ index = indexNameFormatter .indexNameForTimestamp (SPAN , indexTimestamp );
85
74
}
86
- if (!span .name .isEmpty ()) putServiceSpans (indexToServiceSpans , index , span );
87
- byte [] document = Codec .JSON .writeSpan (span );
88
- if (timestampMillis != null ) document = prefixWithTimestampMillis (document , timestampMillis );
89
- indexer .add (index , ElasticsearchHttpSpanStore .SPAN , document , null /* Allow ES to choose an ID */ );
90
- }
91
- return indexToServiceSpans ;
92
- }
93
-
94
- void putServiceSpans (Map <String , Set <Pair <String >>> indexToServiceSpans , String index , Span s ) {
95
- Set <Pair <String >> serviceSpans = indexToServiceSpans .get (index );
96
- if (serviceSpans == null ) indexToServiceSpans .put (index , serviceSpans = new LinkedHashSet <>());
97
- for (String serviceName : s .serviceNames ()) {
98
- serviceSpans .add (Pair .create (serviceName , s .name ));
99
- }
100
- }
101
-
102
- /**
103
- * Adds service and span names to the pending batch. The id is "serviceName|spanName" to prevent
104
- * a large order of duplicates ending up in the daily index. This also means queries do not need
105
- * to deduplicate.
106
- */
107
- void indexNames (HttpBulkIndexer indexer , Map <String , Set <Pair <String >>> indexToServiceSpans )
108
- throws IOException {
109
- Buffer buffer = new Buffer ();
110
- for (Map .Entry <String , Set <Pair <String >>> entry : indexToServiceSpans .entrySet ()) {
111
- String index = entry .getKey ();
112
- for (Pair <String > serviceSpan : entry .getValue ()) {
113
- JsonWriter writer = JsonWriter .of (buffer );
114
- writer .beginObject ();
115
- writer .name ("serviceName" ).value (serviceSpan ._1 );
116
- writer .name ("spanName" ).value (serviceSpan ._2 );
117
- writer .endObject ();
118
- byte [] document = buffer .readByteArray ();
119
- indexer .add (index , SERVICE_SPAN , document , serviceSpan ._1 + "|" + serviceSpan ._2 );
75
+ for (Span2 span2 : Span2Converter .fromSpan (span )) {
76
+ byte [] document = Span2Codec .JSON .writeSpan (span2 );
77
+ if (timestampMillis != null ) {
78
+ document = prefixWithTimestampMillis (document , timestampMillis );
79
+ }
80
+ indexer .add (index , SPAN , document , null /* Allow ES to choose an ID */ );
120
81
}
121
82
}
122
83
}
0 commit comments