Skip to content

Commit 4751608

Browse files
wildumptodev
authored andcommitted
Otel receivers spawn on need (grafana#687)
* Spawn otelcol receivers on need. Spawning a receiver for every signal for every receiver component was not needed and created errors with the kafka receivers. Now the otelcol receiver checks which signals are needed and spawn the corresponding receivers. * changelog entry * add warning when more than one signal is set for the output of the kafka receiver * cleanup * use validate instead of logging * update doc * add test to checkt that the receiver spawns the correct receivers on update * doc typo * remove default value in code * update tests * Update docs/sources/reference/components/otelcol.receiver.kafka.md Co-authored-by: Paulin Todev <[email protected]> * Update internal/component/otelcol/receiver/kafka/kafka.go Co-authored-by: Paulin Todev <[email protected]> * Update docs/sources/reference/components/otelcol.receiver.kafka.md Co-authored-by: Paulin Todev <[email protected]> * adapt tests * make test more readable --------- Co-authored-by: Paulin Todev <[email protected]>
1 parent 7e860dc commit 4751608

File tree

7 files changed

+155
-29
lines changed

7 files changed

+155
-29
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ Main (unreleased)
4343

4444
- Fixed issue where text labels displayed outside of component node's boundary. (@hainenber)
4545

46+
- Fix a bug where a topic was claimed by the wrong consumer type in `otelcol.receiver.kafka`. (@wildum)
47+
4648
- Fix an issue where nested import.git config blocks could conflict if they had the same labels. (@wildum)
4749

4850
- In `mimir.rules.kubernetes`, fix an issue where unrecoverable errors from the Mimir API were retried. (@56quarters)

docs/sources/reference/components/otelcol.receiver.kafka.md

+11-1
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,23 @@ Name | Type | Description | Default | Required
4040
---- | ---- | ----------- | ------- | --------
4141
`brokers` | `array(string)` | Kafka brokers to connect to. | | yes
4242
`protocol_version` | `string` | Kafka protocol version to use. | | yes
43-
`topic` | `string` | Kafka topic to read from. | `"otlp_spans"` | no
43+
`topic` | `string` | Kafka topic to read from. | | no
4444
`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no
4545
`group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no
4646
`client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no
4747
`initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no
4848
`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no
4949

50+
If `topic` is not set, different topics will be used for different telemetry signals:
51+
52+
* Metrics will be received from an `otlp_metrics` topic.
53+
* Traces will be received from an `otlp_spans` topic.
54+
* Logs will be received from an `otlp_logs` topic.
55+
56+
If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block.
57+
For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces.
58+
If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics.
59+
5060
The `encoding` argument determines how to decode messages read from Kafka.
5161
`encoding` must be one of the following strings:
5262

internal/component/otelcol/receiver/kafka/kafka.go

+23-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
package kafka
33

44
import (
5+
"fmt"
6+
"strings"
57
"time"
68

79
"github.com/grafana/alloy/internal/component"
@@ -63,7 +65,6 @@ func (args *Arguments) SetToDefault() {
6365
// for compatibility, even though that means using a client and group ID of
6466
// "otel-collector".
6567

66-
Topic: "otlp_spans",
6768
Encoding: "otlp_proto",
6869
Brokers: []string{"localhost:9092"},
6970
ClientID: "otel-collector",
@@ -77,6 +78,27 @@ func (args *Arguments) SetToDefault() {
7778
args.DebugMetrics.SetToDefault()
7879
}
7980

81+
// Validate implements syntax.Validator.
82+
func (args *Arguments) Validate() error {
83+
var signals []string
84+
85+
if len(args.Topic) > 0 {
86+
if len(args.Output.Logs) > 0 {
87+
signals = append(signals, "logs")
88+
}
89+
if len(args.Output.Metrics) > 0 {
90+
signals = append(signals, "metrics")
91+
}
92+
if len(args.Output.Traces) > 0 {
93+
signals = append(signals, "traces")
94+
}
95+
if len(signals) > 1 {
96+
return fmt.Errorf("only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: %s", strings.Join(signals, ", "))
97+
}
98+
}
99+
return nil
100+
}
101+
80102
// Convert implements receiver.Arguments.
81103
func (args Arguments) Convert() (otelcomponent.Config, error) {
82104
input := make(map[string]interface{})

internal/component/otelcol/receiver/kafka/kafka_test.go

+26-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/grafana/alloy/internal/component/otelcol"
8+
"github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer"
89
"github.com/grafana/alloy/internal/component/otelcol/receiver/kafka"
910
"github.com/grafana/alloy/syntax"
1011
"github.com/mitchellh/mapstructure"
@@ -29,7 +30,6 @@ func TestArguments_UnmarshalAlloy(t *testing.T) {
2930
expected: kafkareceiver.Config{
3031
Brokers: []string{"10.10.10.10:9092"},
3132
ProtocolVersion: "2.0.0",
32-
Topic: "otlp_spans",
3333
Encoding: "otlp_proto",
3434
GroupID: "otel-collector",
3535
ClientID: "otel-collector",
@@ -153,7 +153,6 @@ func TestArguments_Auth(t *testing.T) {
153153
expected: map[string]interface{}{
154154
"brokers": []string{"10.10.10.10:9092"},
155155
"protocol_version": "2.0.0",
156-
"topic": "otlp_spans",
157156
"encoding": "otlp_proto",
158157
"group_id": "otel-collector",
159158
"client_id": "otel-collector",
@@ -205,7 +204,6 @@ func TestArguments_Auth(t *testing.T) {
205204
expected: map[string]interface{}{
206205
"brokers": []string{"10.10.10.10:9092"},
207206
"protocol_version": "2.0.0",
208-
"topic": "otlp_spans",
209207
"encoding": "otlp_proto",
210208
"group_id": "otel-collector",
211209
"client_id": "otel-collector",
@@ -263,7 +261,6 @@ func TestArguments_Auth(t *testing.T) {
263261
expected: map[string]interface{}{
264262
"brokers": []string{"10.10.10.10:9092"},
265263
"protocol_version": "2.0.0",
266-
"topic": "otlp_spans",
267264
"encoding": "otlp_proto",
268265
"group_id": "otel-collector",
269266
"client_id": "otel-collector",
@@ -320,7 +317,6 @@ func TestArguments_Auth(t *testing.T) {
320317
expected: map[string]interface{}{
321318
"brokers": []string{"10.10.10.10:9092"},
322319
"protocol_version": "2.0.0",
323-
"topic": "otlp_spans",
324320
"encoding": "otlp_proto",
325321
"group_id": "otel-collector",
326322
"client_id": "otel-collector",
@@ -433,3 +429,28 @@ func TestDebugMetricsConfig(t *testing.T) {
433429
})
434430
}
435431
}
432+
433+
func TestArguments_Validate(t *testing.T) {
434+
cfg := `
435+
brokers = ["10.10.10.10:9092"]
436+
protocol_version = "2.0.0"
437+
topic = "traces"
438+
output {
439+
}
440+
`
441+
var args kafka.Arguments
442+
require.NoError(t, syntax.Unmarshal([]byte(cfg), &args))
443+
444+
// Adding two traces consumer, expect no error
445+
args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{})
446+
args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{})
447+
require.NoError(t, args.Validate())
448+
449+
// Adding another signal type
450+
args.Output.Logs = append(args.Output.Logs, &fakeconsumer.Consumer{})
451+
require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, traces")
452+
453+
// Adding another signal type
454+
args.Output.Metrics = append(args.Output.Metrics, &fakeconsumer.Consumer{})
455+
require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, metrics, traces")
456+
}

internal/component/otelcol/receiver/receiver.go

+25-21
Original file line numberDiff line numberDiff line change
@@ -149,36 +149,40 @@ func (r *Receiver) Update(args component.Arguments) error {
149149
return err
150150
}
151151

152-
var (
153-
next = rargs.NextConsumers()
154-
nextTraces = fanoutconsumer.Traces(next.Traces)
155-
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
156-
nextLogs = fanoutconsumer.Logs(next.Logs)
157-
)
152+
next := rargs.NextConsumers()
158153

159154
// Create instances of the receiver from our factory for each of our
160155
// supported telemetry signals.
161156
var components []otelcomponent.Component
162157

163-
tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
164-
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
165-
return err
166-
} else if tracesReceiver != nil {
167-
components = append(components, tracesReceiver)
158+
if len(next.Traces) > 0 {
159+
nextTraces := fanoutconsumer.Traces(next.Traces)
160+
tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
161+
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
162+
return err
163+
} else if tracesReceiver != nil {
164+
components = append(components, tracesReceiver)
165+
}
168166
}
169167

170-
metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
171-
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
172-
return err
173-
} else if metricsReceiver != nil {
174-
components = append(components, metricsReceiver)
168+
if len(next.Metrics) > 0 {
169+
nextMetrics := fanoutconsumer.Metrics(next.Metrics)
170+
metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
171+
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
172+
return err
173+
} else if metricsReceiver != nil {
174+
components = append(components, metricsReceiver)
175+
}
175176
}
176177

177-
logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
178-
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
179-
return err
180-
} else if logsReceiver != nil {
181-
components = append(components, logsReceiver)
178+
if len(next.Logs) > 0 {
179+
nextLogs := fanoutconsumer.Logs(next.Logs)
180+
logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
181+
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
182+
return err
183+
} else if logsReceiver != nil {
184+
components = append(components, logsReceiver)
185+
}
182186
}
183187

184188
// Schedule the components to run once our component is running.

internal/component/otelcol/receiver/receiver_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,74 @@ func TestReceiver(t *testing.T) {
5757
require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
5858
}
5959

60+
func TestReceiverNotStarted(t *testing.T) {
61+
var (
62+
waitConsumerTrigger = util.NewWaitTrigger()
63+
onTracesConsumer = func(t otelconsumer.Traces) {
64+
waitConsumerTrigger.Trigger()
65+
}
66+
)
67+
te := newTestEnvironment(t, onTracesConsumer)
68+
te.Start(fakeReceiverArgs{
69+
Output: &otelcol.ConsumerArguments{},
70+
})
71+
72+
// Check that no trace receiver was started because it's not needed by the output.
73+
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
74+
}
75+
76+
func TestReceiverUpdate(t *testing.T) {
77+
var (
78+
consumer otelconsumer.Traces
79+
80+
waitConsumerTrigger = util.NewWaitTrigger()
81+
onTracesConsumer = func(t otelconsumer.Traces) {
82+
consumer = t
83+
waitConsumerTrigger.Trigger()
84+
}
85+
86+
waitTracesTrigger = util.NewWaitTrigger()
87+
nextConsumer = &fakeconsumer.Consumer{
88+
ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {
89+
waitTracesTrigger.Trigger()
90+
return nil
91+
},
92+
}
93+
)
94+
95+
te := newTestEnvironment(t, onTracesConsumer)
96+
te.Start(fakeReceiverArgs{
97+
Output: &otelcol.ConsumerArguments{},
98+
})
99+
100+
// Check that no trace receiver was started because it's not needed by the output.
101+
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
102+
103+
te.Controller.Update(fakeReceiverArgs{
104+
Output: &otelcol.ConsumerArguments{
105+
Traces: []otelcol.Consumer{nextConsumer},
106+
},
107+
})
108+
109+
// Now the trace receiver is started.
110+
require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no traces consumer sent")
111+
112+
err := consumer.ConsumeTraces(context.Background(), ptrace.NewTraces())
113+
require.NoError(t, err)
114+
115+
require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
116+
117+
waitConsumerTrigger = util.NewWaitTrigger()
118+
119+
// Remove the trace receiver.
120+
te.Controller.Update(fakeReceiverArgs{
121+
Output: &otelcol.ConsumerArguments{},
122+
})
123+
124+
// Check that after the update no trace receiver is started.
125+
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
126+
}
127+
60128
type testEnvironment struct {
61129
t *testing.T
62130

internal/converter/internal/otelcolconvert/testdata/kafka.alloy

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
otelcol.receiver.kafka "default" {
22
brokers = ["broker:9092"]
33
protocol_version = "2.0.0"
4-
topic = ""
54

65
authentication {
76
plaintext {

0 commit comments

Comments
 (0)