4
4
package cloudfoundryreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/cloudfoundryreceiver"
5
5
6
6
import (
7
+ "fmt"
7
8
"strings"
8
9
"time"
10
+ "unicode"
9
11
10
12
"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
11
13
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -15,7 +17,11 @@ import (
15
17
)
16
18
17
19
const (
18
- attributeNamePrefix = "org.cloudfoundry."
20
+ attributeNamePrefix = "org.cloudfoundry."
21
+ envelopeSourceTypeTag = "org.cloudfoundry.source_type"
22
+ envelopeSourceTypeValueRTR = "RTR"
23
+ logLineRTRTraceIDKey = "x_b3_traceid"
24
+ logLineRTRSpanIDKey = "x_b3_spanid"
19
25
)
20
26
21
27
func convertEnvelopeToMetrics (envelope * loggregator_v2.Envelope , metricSlice pmetric.MetricSlice , startTime time.Time ) {
@@ -43,9 +49,8 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
43
49
}
44
50
}
45
51
46
- func convertEnvelopeToLogs (envelope * loggregator_v2.Envelope , logSlice plog.LogRecordSlice , startTime time.Time ) {
47
- switch envelope .Message .(type ) {
48
- case * loggregator_v2.Envelope_Log :
52
+ func convertEnvelopeToLogs (envelope * loggregator_v2.Envelope , logSlice plog.LogRecordSlice , startTime time.Time ) error {
53
+ if _ , isLog := envelope .Message .(* loggregator_v2.Envelope_Log ); isLog {
49
54
log := logSlice .AppendEmpty ()
50
55
log .SetTimestamp (pcommon .Timestamp (envelope .GetTimestamp ()))
51
56
log .SetObservedTimestamp (pcommon .NewTimestampFromTime (startTime ))
@@ -59,9 +64,30 @@ func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogR
59
64
log .SetSeverityNumber (plog .SeverityNumberError )
60
65
}
61
66
copyEnvelopeAttributes (log .Attributes (), envelope )
62
- _ = parseLogTracingFields (log )
63
- default :
67
+ if value , found := log .Attributes ().Get (envelopeSourceTypeTag ); found && value .AsString () == envelopeSourceTypeValueRTR {
68
+ _ , wordsMap := parseLogLine (log .Body ().AsString ())
69
+ traceIDStr , found := wordsMap [logLineRTRTraceIDKey ]
70
+ if ! found {
71
+ return fmt .Errorf ("traceid key %s not found in log" , logLineRTRTraceIDKey )
72
+ }
73
+ spanIDStr , found := wordsMap [logLineRTRSpanIDKey ]
74
+ if ! found {
75
+ return fmt .Errorf ("spanid key %s not found in log" , logLineRTRSpanIDKey )
76
+ }
77
+ traceID , err := trace .TraceIDFromHex (traceIDStr )
78
+ if err != nil {
79
+ return err
80
+ }
81
+ spanID , err := trace .SpanIDFromHex (spanIDStr )
82
+ if err != nil {
83
+ return err
84
+ }
85
+ log .SetTraceID ([16 ]byte (traceID ))
86
+ log .SetSpanID ([8 ]byte (spanID ))
87
+ }
88
+ return nil
64
89
}
90
+ return fmt .Errorf ("envelope is not a log" )
65
91
}
66
92
67
93
func copyEnvelopeAttributes (attributes pcommon.Map , envelope * loggregator_v2.Envelope ) {
@@ -78,37 +104,73 @@ func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Env
78
104
}
79
105
}
80
106
81
- func parseLogTracingFields (log plog.LogRecord ) error {
82
- if value , found := log .Attributes ().Get ("org.cloudfoundry.source_type" ); ! found || value .AsString () != "RTR" {
83
- return nil
84
- }
85
- s := log .Body ().AsString ()
86
- quoted := false
87
- a := strings .FieldsFunc (s , func (r rune ) bool {
88
- if r == '"' {
89
- quoted = ! quoted
107
+ func parseLogLine (s string ) ([]string , map [string ]string ) {
108
+ wordList := make ([]string , 0 , 20 )
109
+ sb := & strings.Builder {}
110
+ mapValue := & strings.Builder {}
111
+ timestamp := & strings.Builder {}
112
+ isTimeStamp := false
113
+ mapKey := ""
114
+ isMap := false
115
+ isQuoted := false
116
+ wordMap := make (map [string ]string )
117
+ for _ , ch := range s {
118
+ if ch == '"' {
119
+ isQuoted = ! isQuoted
120
+ sb .WriteRune (ch )
121
+ continue
90
122
}
91
- return ! quoted && r == ' '
92
- })
93
-
94
- traceIDStr := strings .Split (a [21 ], ":" )[1 ]
95
- traceIDStr = strings .Trim (traceIDStr , "\" " )
96
-
97
- spanIDStr := strings .Split (a [22 ], ":" )[1 ]
98
- spanIDStr = strings .Trim (spanIDStr , "\" " )
99
-
100
- traceID , err := trace .TraceIDFromHex (traceIDStr )
101
- if err != nil {
102
- return err
123
+ if isQuoted {
124
+ sb .WriteRune (ch )
125
+ if isMap {
126
+ mapValue .WriteRune (ch )
127
+ }
128
+ continue
129
+ }
130
+ if ch == '[' && sb .Len () == 0 {
131
+ // first char after space
132
+ isTimeStamp = true
133
+ continue
134
+ }
135
+ if ch == ']' && isTimeStamp {
136
+ wordList = append (wordList , timestamp .String ())
137
+ timestamp .Reset ()
138
+ isTimeStamp = false
139
+ continue
140
+ }
141
+ if isTimeStamp {
142
+ timestamp .WriteRune (ch )
143
+ continue
144
+ }
145
+ if unicode .IsSpace (ch ) {
146
+ if sb .Len () > 0 {
147
+ word := sb .String ()
148
+ if isMap {
149
+ wordMap [mapKey ] = mapValue .String ()
150
+ } else if strings .HasPrefix (word , `"` ) && strings .HasSuffix (word , `"` ) {
151
+ // remove " if the item is not a keyMap and starts and ends with it
152
+ word = strings .Trim (word , `"` )
153
+ }
154
+ wordList = append (wordList , word )
155
+ }
156
+ isMap = false
157
+ mapValue .Reset ()
158
+ sb .Reset ()
159
+ continue
160
+ }
161
+ if isMap {
162
+ mapValue .WriteRune (ch )
163
+ } else if ch == ':' {
164
+ mapKey = sb .String ()
165
+ isMap = true
166
+ }
167
+ sb .WriteRune (ch )
103
168
}
104
-
105
- spanID , err := trace .SpanIDFromHex (spanIDStr )
106
- if err != nil {
107
- return err
169
+ if sb .Len () > 0 {
170
+ wordList = append (wordList , sb .String ())
171
+ if isMap {
172
+ wordMap [mapKey ] = mapValue .String ()
173
+ }
108
174
}
109
-
110
- log .SetTraceID ([16 ]byte (traceID ))
111
- log .SetSpanID ([8 ]byte (spanID ))
112
-
113
- return nil
175
+ return wordList , wordMap
114
176
}
0 commit comments