|
4 | 4 | package adapter // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
|
5 | 5 |
|
6 | 6 | import (
|
7 |
| - "context" |
8 | 7 | "encoding/binary"
|
9 | 8 | "encoding/json"
|
10 |
| - "errors" |
11 | 9 | "fmt"
|
12 |
| - "math" |
13 |
| - "runtime" |
14 | 10 | "sort"
|
15 | 11 | "sync"
|
16 | 12 |
|
17 | 13 | "github.com/cespare/xxhash/v2"
|
18 |
| - "go.opentelemetry.io/collector/component" |
19 | 14 | "go.opentelemetry.io/collector/pdata/pcommon"
|
20 | 15 | "go.opentelemetry.io/collector/pdata/plog"
|
21 |
| - "go.uber.org/zap" |
22 | 16 |
|
23 | 17 | "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
|
24 | 18 | )
|
25 | 19 |
|
26 |
| -// Converter converts a batch of entry.Entry into plog.Logs aggregating translated |
27 |
| -// entries into logs coming from the same Resource. |
28 |
| -// |
29 |
| -// The diagram below illustrates the internal communication inside the Converter: |
30 |
| -// |
31 |
| -// ┌─────────────────────────────────┐ |
32 |
| -// │ Batch() │ |
33 |
| -// ┌─────────┤ Ingests batches of log entries │ |
34 |
| -// │ │ and sends them onto workerChan │ |
35 |
| -// │ └─────────────────────────────────┘ |
36 |
| -// │ |
37 |
| -// │ ┌───────────────────────────────────────────────────┐ |
38 |
| -// ├─► workerLoop() │ |
39 |
| -// │ │ ┌─────────────────────────────────────────────────┴─┐ |
40 |
| -// ├─┼─► workerLoop() │ |
41 |
| -// │ │ │ ┌─────────────────────────────────────────────────┴─┐ |
42 |
| -// └─┼─┼─► workerLoop() │ |
43 |
| -// └─┤ │ consumes sent log entries from workerChan, │ |
44 |
| -// │ │ translates received entries to plog.LogRecords, │ |
45 |
| -// └─┤ and sends them on flushChan │ |
46 |
| -// └─────────────────────────┬─────────────────────────┘ |
47 |
| -// │ |
48 |
| -// ▼ |
49 |
| -// ┌─────────────────────────────────────────────────────┐ |
50 |
| -// │ flushLoop() │ |
51 |
| -// │ receives log records from flushChan and sends │ |
52 |
| -// │ them onto pLogsChan which is consumed by │ |
53 |
| -// │ downstream consumers via OutChannel() │ |
54 |
| -// └─────────────────────────────────────────────────────┘ |
55 |
| -type Converter struct { |
56 |
| - set component.TelemetrySettings |
57 |
| - |
58 |
| - // pLogsChan is a channel on which aggregated logs will be sent to. |
59 |
| - pLogsChan chan plog.Logs |
60 |
| - |
61 |
| - stopOnce sync.Once |
62 |
| - |
63 |
| - // converterChan is an internal communication channel signaling stop was called |
64 |
| - // prevents sending to closed channels |
65 |
| - converterChan chan struct{} |
66 |
| - |
67 |
| - // workerChan is an internal communication channel that gets the log |
68 |
| - // entries from Batch() calls and it receives the data in workerLoop(). |
69 |
| - workerChan chan []*entry.Entry |
70 |
| - // workerCount configures the amount of workers started. |
71 |
| - workerCount int |
72 |
| - |
73 |
| - // flushChan is an internal channel used for transporting batched plog.Logs. |
74 |
| - flushChan chan plog.Logs |
75 |
| - |
76 |
| - // wg is a WaitGroup that makes sure that we wait for spun up goroutines exit |
77 |
| - // when Stop() is called. |
78 |
| - wg sync.WaitGroup |
79 |
| - |
80 |
| - // flushWg is a WaitGroup that makes sure that we wait for flush loop to exit |
81 |
| - // when Stop() is called. |
82 |
| - flushWg sync.WaitGroup |
83 |
| -} |
84 |
| - |
85 |
| -type converterOption interface { |
86 |
| - apply(*Converter) |
87 |
| -} |
88 |
| - |
89 |
| -func withWorkerCount(workerCount int) converterOption { |
90 |
| - return workerCountOption{workerCount} |
91 |
| -} |
92 |
| - |
93 |
| -type workerCountOption struct { |
94 |
| - workerCount int |
95 |
| -} |
96 |
| - |
97 |
| -func (o workerCountOption) apply(c *Converter) { |
98 |
| - c.workerCount = o.workerCount |
99 |
| -} |
100 |
| - |
101 |
| -func NewConverter(set component.TelemetrySettings, opts ...converterOption) *Converter { |
102 |
| - set.Logger = set.Logger.With(zap.String("component", "converter")) |
103 |
| - c := &Converter{ |
104 |
| - set: set, |
105 |
| - workerChan: make(chan []*entry.Entry), |
106 |
| - workerCount: int(math.Max(1, float64(runtime.NumCPU()/4))), |
107 |
| - pLogsChan: make(chan plog.Logs), |
108 |
| - converterChan: make(chan struct{}), |
109 |
| - flushChan: make(chan plog.Logs), |
110 |
| - } |
111 |
| - for _, opt := range opts { |
112 |
| - opt.apply(c) |
113 |
| - } |
114 |
| - return c |
115 |
| -} |
116 |
| - |
117 |
| -func (c *Converter) Start() { |
118 |
| - c.set.Logger.Debug("Starting log converter", zap.Int("worker_count", c.workerCount)) |
119 |
| - |
120 |
| - c.wg.Add(c.workerCount) |
121 |
| - for i := 0; i < c.workerCount; i++ { |
122 |
| - go c.workerLoop() |
123 |
| - } |
124 |
| - |
125 |
| - c.flushWg.Add(1) |
126 |
| - go c.flushLoop() |
127 |
| -} |
128 |
| - |
129 |
| -func (c *Converter) Stop() { |
130 |
| - c.stopOnce.Do(func() { |
131 |
| - close(c.converterChan) |
132 |
| - |
133 |
| - // close workerChan and wait for entries to be processed |
134 |
| - close(c.workerChan) |
135 |
| - c.wg.Wait() |
136 |
| - |
137 |
| - // close flushChan and wait for flush loop to finish |
138 |
| - close(c.flushChan) |
139 |
| - c.flushWg.Wait() |
140 |
| - |
141 |
| - // close pLogsChan so callers can stop processing |
142 |
| - close(c.pLogsChan) |
143 |
| - }) |
144 |
| -} |
145 |
| - |
146 |
| -// OutChannel returns the channel on which converted entries will be sent to. |
147 |
| -func (c *Converter) OutChannel() <-chan plog.Logs { |
148 |
| - return c.pLogsChan |
149 |
| -} |
150 |
| - |
151 |
| -// workerLoop is responsible for obtaining log entries from Batch() calls, |
152 |
| -// converting them to plog.LogRecords batched by Resource, and sending them |
153 |
| -// on flushChan. |
154 |
| -func (c *Converter) workerLoop() { |
155 |
| - defer c.wg.Done() |
156 |
| - |
157 |
| - for entries := range c.workerChan { |
158 |
| - // Send plogs directly to flushChan |
159 |
| - c.flushChan <- ConvertEntries(entries) |
160 |
| - } |
161 |
| -} |
162 |
| - |
163 | 20 | func ConvertEntries(entries []*entry.Entry) plog.Logs {
|
164 | 21 | resourceHashToIdx := make(map[uint64]int)
|
165 | 22 | scopeIdxByResource := make(map[uint64]map[string]int)
|
@@ -197,47 +54,6 @@ func ConvertEntries(entries []*entry.Entry) plog.Logs {
|
197 | 54 | return pLogs
|
198 | 55 | }
|
199 | 56 |
|
200 |
| -func (c *Converter) flushLoop() { |
201 |
| - defer c.flushWg.Done() |
202 |
| - ctx, cancel := context.WithCancel(context.Background()) |
203 |
| - defer cancel() |
204 |
| - |
205 |
| - for pLogs := range c.flushChan { |
206 |
| - if err := c.flush(ctx, pLogs); err != nil { |
207 |
| - c.set.Logger.Debug("Problem sending log entries", |
208 |
| - zap.Error(err), |
209 |
| - ) |
210 |
| - } |
211 |
| - } |
212 |
| -} |
213 |
| - |
214 |
| -// flush flushes provided plog.Logs entries onto a channel. |
215 |
| -func (c *Converter) flush(ctx context.Context, pLogs plog.Logs) error { |
216 |
| - doneChan := ctx.Done() |
217 |
| - |
218 |
| - select { |
219 |
| - case <-doneChan: |
220 |
| - return fmt.Errorf("flushing log entries interrupted, err: %w", ctx.Err()) |
221 |
| - |
222 |
| - case c.pLogsChan <- pLogs: |
223 |
| - } |
224 |
| - |
225 |
| - return nil |
226 |
| -} |
227 |
| - |
228 |
| -// Batch takes in an entry.Entry and sends it to an available worker for processing. |
229 |
| -func (c *Converter) Batch(e []*entry.Entry) error { |
230 |
| - // in case Stop was called do not process batch |
231 |
| - select { |
232 |
| - case <-c.converterChan: |
233 |
| - return errors.New("logs converter has been stopped") |
234 |
| - default: |
235 |
| - } |
236 |
| - |
237 |
| - c.workerChan <- e |
238 |
| - return nil |
239 |
| -} |
240 |
| - |
241 | 57 | // convert converts one entry.Entry into plog.LogRecord allocating it.
|
242 | 58 | func convert(ent *entry.Entry) plog.LogRecord {
|
243 | 59 | dest := plog.NewLogRecord()
|
|
0 commit comments