4
4
package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver"
5
5
6
6
import (
7
+ "encoding/hex"
7
8
"fmt"
8
9
"strings"
9
10
"time"
@@ -13,15 +14,14 @@ import (
13
14
"go.opentelemetry.io/collector/pdata/pcommon"
14
15
"go.opentelemetry.io/collector/pdata/plog"
15
16
"go.opentelemetry.io/collector/pdata/pmetric"
16
- "go.opentelemetry.io/otel/trace"
17
17
)
18
18
19
19
const (
20
20
attributeNamePrefix = "org.cloudfoundry."
21
21
envelopeSourceTypeTag = "org.cloudfoundry.source_type"
22
22
envelopeSourceTypeValueRTR = "RTR"
23
- logLineRTRTraceIDKey = "x_b3_traceid "
24
- logLineRTRSpanIDKey = "x_b3_spanid "
23
+ logLineRTRZipkinKey = "b3 "
24
+ logLineRTRW3CKey = "traceparent "
25
25
)
26
26
27
27
func convertEnvelopeToMetrics (envelope * loggregator_v2.Envelope , metricSlice pmetric.MetricSlice , startTime time.Time ) {
@@ -50,55 +50,86 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
50
50
}
51
51
52
52
func convertEnvelopeToLogs (envelope * loggregator_v2.Envelope , logSlice plog.LogRecordSlice , startTime time.Time ) error {
53
- if _ , isLog := envelope .Message .(* loggregator_v2.Envelope_Log ); isLog {
54
- log := logSlice .AppendEmpty ()
55
- log .SetTimestamp (pcommon .Timestamp (envelope .GetTimestamp ()))
56
- log .SetObservedTimestamp (pcommon .NewTimestampFromTime (startTime ))
57
- log .Body ().SetStr (string (envelope .GetLog ().GetPayload ()))
58
- switch envelope .GetLog ().GetType () {
59
- case loggregator_v2 .Log_OUT :
60
- log .SetSeverityText (plog .SeverityNumberInfo .String ())
61
- log .SetSeverityNumber (plog .SeverityNumberInfo )
62
- case loggregator_v2 .Log_ERR :
63
- log .SetSeverityText (plog .SeverityNumberError .String ())
64
- log .SetSeverityNumber (plog .SeverityNumberError )
53
+ log := logSlice .AppendEmpty ()
54
+ log .SetTimestamp (pcommon .Timestamp (envelope .GetTimestamp ()))
55
+ log .SetObservedTimestamp (pcommon .NewTimestampFromTime (startTime ))
56
+ logLine := string (envelope .GetLog ().GetPayload ())
57
+ log .Body ().SetStr (logLine )
58
+ switch envelope .GetLog ().GetType () {
59
+ case loggregator_v2 .Log_OUT :
60
+ log .SetSeverityText (plog .SeverityNumberInfo .String ())
61
+ log .SetSeverityNumber (plog .SeverityNumberInfo )
62
+ case loggregator_v2 .Log_ERR :
63
+ log .SetSeverityText (plog .SeverityNumberError .String ())
64
+ log .SetSeverityNumber (plog .SeverityNumberError )
65
+ }
66
+ copyEnvelopeAttributes (log .Attributes (), envelope )
67
+ if envelope .SourceId == envelopeSourceTypeValueRTR {
68
+ traceID , spanID , err := getTracingIDs (logLine )
69
+ if err != nil {
70
+ return err
65
71
}
66
- copyEnvelopeAttributes (log .Attributes (), envelope )
67
- if value , found := log .Attributes ().Get (envelopeSourceTypeTag ); found && value .AsString () == envelopeSourceTypeValueRTR {
68
- _ , wordsMap := parseLogLine (log .Body ().AsString ())
69
- traceIDStr , found := wordsMap [logLineRTRTraceIDKey ]
70
- if ! found {
71
- return fmt .Errorf ("traceid key %s not found in log" , logLineRTRTraceIDKey )
72
- }
73
- spanIDStr , found := wordsMap [logLineRTRSpanIDKey ]
74
- if ! found {
75
- return fmt .Errorf ("spanid key %s not found in log" , logLineRTRSpanIDKey )
76
- }
77
- traceID , err := trace .TraceIDFromHex (traceIDStr )
78
- if err != nil {
79
- return err
80
- }
81
- spanID , err := trace .SpanIDFromHex (spanIDStr )
82
- if err != nil {
83
- return err
84
- }
85
- log .SetTraceID ([16 ]byte (traceID ))
86
- log .SetSpanID ([8 ]byte (spanID ))
72
+ if ! pcommon .TraceID (traceID ).IsEmpty () {
73
+ log .SetTraceID (traceID )
74
+ log .SetSpanID (spanID )
87
75
}
88
- return nil
89
76
}
90
- return fmt .Errorf ("envelope is not a log" )
77
+ return nil
78
+ }
79
+
80
+ func getTracingIDs (logLine string ) (traceID [16 ]byte , spanID [8 ]byte , err error ) {
81
+ var trace []byte
82
+ var span []byte
83
+ _ , wordsMap := parseLogLine (logLine )
84
+ traceIDStr , foundW3C := wordsMap [logLineRTRW3CKey ]
85
+ if foundW3C {
86
+ // Use W3C headers
87
+ traceW3C := strings .Split (traceIDStr , "-" )
88
+ if len (traceW3C ) != 4 || traceW3C [0 ] != "00" {
89
+ err = fmt .Errorf (
90
+ "traceId W3C key %s with format %s not valid in log" ,
91
+ logLineRTRW3CKey , traceW3C [0 ])
92
+ return
93
+ }
94
+ trace = []byte (traceW3C [1 ])
95
+ span = []byte (traceW3C [2 ])
96
+ } else {
97
+ // try Zipkin headers
98
+ traceIDStr , foundZk := wordsMap [logLineRTRZipkinKey ]
99
+ if ! foundZk {
100
+ // log line has no tracing headers
101
+ return
102
+ }
103
+ traceZk := strings .Split (traceIDStr , "-" )
104
+ if len (traceZk ) != 2 {
105
+ err = fmt .Errorf (
106
+ "traceId Zipkin key %s not valid in log" ,
107
+ logLineRTRZipkinKey )
108
+ return
109
+ }
110
+ trace = []byte (traceZk [0 ])
111
+ span = []byte (traceZk [1 ])
112
+ }
113
+ traceDecoded := make ([]byte , 16 )
114
+ spanDecoded := make ([]byte , 8 )
115
+ if _ , err = hex .Decode (traceDecoded , trace ); err != nil {
116
+ return
117
+ }
118
+ if _ , err = hex .Decode (spanDecoded , span ); err != nil {
119
+ return
120
+ }
121
+ copy (traceID [:], traceDecoded )
122
+ copy (spanID [:], spanDecoded )
123
+ return
91
124
}
92
125
93
126
func copyEnvelopeAttributes (attributes pcommon.Map , envelope * loggregator_v2.Envelope ) {
94
127
for key , value := range envelope .Tags {
95
128
attributes .PutStr (attributeNamePrefix + key , value )
96
129
}
97
-
98
130
if envelope .SourceId != "" {
99
131
attributes .PutStr (attributeNamePrefix + "source_id" , envelope .SourceId )
100
132
}
101
-
102
133
if envelope .InstanceId != "" {
103
134
attributes .PutStr (attributeNamePrefix + "instance_id" , envelope .InstanceId )
104
135
}
0 commit comments