diff --git a/CHANGELOG.md b/CHANGELOG.md index f6d256d5f7..13dbe33bcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,8 @@ Main (unreleased) - Fixed issue where text labels displayed outside of component node's boundary. (@hainenber) +- Fix a bug where a topic was claimed by the wrong consumer type in `otelcol.receiver.kafka`. (@wildum) + - Fix an issue where nested import.git config blocks could conflict if they had the same labels. (@wildum) - In `mimir.rules.kubernetes`, fix an issue where unrecoverable errors from the Mimir API were retried. (@56quarters) diff --git a/docs/sources/reference/components/otelcol.receiver.kafka.md b/docs/sources/reference/components/otelcol.receiver.kafka.md index 6448ca58dd..e0664d1ced 100644 --- a/docs/sources/reference/components/otelcol.receiver.kafka.md +++ b/docs/sources/reference/components/otelcol.receiver.kafka.md @@ -40,13 +40,23 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `brokers` | `array(string)` | Kafka brokers to connect to. | | yes `protocol_version` | `string` | Kafka protocol version to use. | | yes -`topic` | `string` | Kafka topic to read from. | `"otlp_spans"` | no +`topic` | `string` | Kafka topic to read from. | | no `encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no `group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no `client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no `initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no `resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no +If `topic` is not set, different topics will be used for different telemetry signals: + +* Metrics will be received from an `otlp_metrics` topic. +* Traces will be received from an `otlp_spans` topic. +* Logs will be received from an `otlp_logs` topic. + +If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block. +For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces. +If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics. + The `encoding` argument determines how to decode messages read from Kafka. `encoding` must be one of the following strings: diff --git a/internal/component/otelcol/receiver/kafka/kafka.go b/internal/component/otelcol/receiver/kafka/kafka.go index 650bf581d3..ec16f00326 100644 --- a/internal/component/otelcol/receiver/kafka/kafka.go +++ b/internal/component/otelcol/receiver/kafka/kafka.go @@ -2,6 +2,8 @@ package kafka import ( + "fmt" + "strings" "time" "github.com/grafana/alloy/internal/component" @@ -63,7 +65,6 @@ func (args *Arguments) SetToDefault() { // for compatibility, even though that means using a client and group ID of // "otel-collector". - Topic: "otlp_spans", Encoding: "otlp_proto", Brokers: []string{"localhost:9092"}, ClientID: "otel-collector", @@ -77,6 +78,27 @@ func (args *Arguments) SetToDefault() { args.DebugMetrics.SetToDefault() } +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + var signals []string + + if len(args.Topic) > 0 { + if len(args.Output.Logs) > 0 { + signals = append(signals, "logs") + } + if len(args.Output.Metrics) > 0 { + signals = append(signals, "metrics") + } + if len(args.Output.Traces) > 0 { + signals = append(signals, "traces") + } + if len(signals) > 1 { + return fmt.Errorf("only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: %s", strings.Join(signals, ", ")) + } + } + return nil +} + // Convert implements receiver.Arguments. func (args Arguments) Convert() (otelcomponent.Config, error) { input := make(map[string]interface{}) diff --git a/internal/component/otelcol/receiver/kafka/kafka_test.go b/internal/component/otelcol/receiver/kafka/kafka_test.go index 6433bdf677..0558949345 100644 --- a/internal/component/otelcol/receiver/kafka/kafka_test.go +++ b/internal/component/otelcol/receiver/kafka/kafka_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" "github.com/grafana/alloy/syntax" "github.com/mitchellh/mapstructure" @@ -29,7 +30,6 @@ func TestArguments_UnmarshalAlloy(t *testing.T) { expected: kafkareceiver.Config{ Brokers: []string{"10.10.10.10:9092"}, ProtocolVersion: "2.0.0", - Topic: "otlp_spans", Encoding: "otlp_proto", GroupID: "otel-collector", ClientID: "otel-collector", @@ -153,7 +153,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -205,7 +204,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -263,7 +261,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -320,7 +317,6 @@ func TestArguments_Auth(t *testing.T) { expected: map[string]interface{}{ "brokers": []string{"10.10.10.10:9092"}, "protocol_version": "2.0.0", - "topic": "otlp_spans", "encoding": "otlp_proto", "group_id": "otel-collector", "client_id": "otel-collector", @@ -433,3 +429,28 @@ func TestDebugMetricsConfig(t *testing.T) { }) } } + +func TestArguments_Validate(t *testing.T) { + cfg := ` + brokers = ["10.10.10.10:9092"] + protocol_version = "2.0.0" + topic = "traces" + output { + } + ` + var args kafka.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + + // Adding two traces consumer, expect no error + args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{}) + args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{}) + require.NoError(t, args.Validate()) + + // Adding another signal type + args.Output.Logs = append(args.Output.Logs, &fakeconsumer.Consumer{}) + require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, traces") + + // Adding another signal type + args.Output.Metrics = append(args.Output.Metrics, &fakeconsumer.Consumer{}) + require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, metrics, traces") +} diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 301ffc09ac..30c136ecfc 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -149,36 +149,40 @@ func (r *Receiver) Update(args component.Arguments) error { return err } - var ( - next = rargs.NextConsumers() - nextTraces = fanoutconsumer.Traces(next.Traces) - nextMetrics = fanoutconsumer.Metrics(next.Metrics) - nextLogs = fanoutconsumer.Logs(next.Logs) - ) + next := rargs.NextConsumers() // Create instances of the receiver from our factory for each of our // supported telemetry signals. var components []otelcomponent.Component - tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if tracesReceiver != nil { - components = append(components, tracesReceiver) + if len(next.Traces) > 0 { + nextTraces := fanoutconsumer.Traces(next.Traces) + tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if tracesReceiver != nil { + components = append(components, tracesReceiver) + } } - metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if metricsReceiver != nil { - components = append(components, metricsReceiver) + if len(next.Metrics) > 0 { + nextMetrics := fanoutconsumer.Metrics(next.Metrics) + metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if metricsReceiver != nil { + components = append(components, metricsReceiver) + } } - logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) - if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { - return err - } else if logsReceiver != nil { - components = append(components, logsReceiver) + if len(next.Logs) > 0 { + nextLogs := fanoutconsumer.Logs(next.Logs) + logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs) + if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) { + return err + } else if logsReceiver != nil { + components = append(components, logsReceiver) + } } // Schedule the components to run once our component is running. diff --git a/internal/component/otelcol/receiver/receiver_test.go b/internal/component/otelcol/receiver/receiver_test.go index 3b4f72d4e1..31eba3fd84 100644 --- a/internal/component/otelcol/receiver/receiver_test.go +++ b/internal/component/otelcol/receiver/receiver_test.go @@ -57,6 +57,74 @@ func TestReceiver(t *testing.T) { require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") } +func TestReceiverNotStarted(t *testing.T) { + var ( + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + waitConsumerTrigger.Trigger() + } + ) + te := newTestEnvironment(t, onTracesConsumer) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that no trace receiver was started because it's not needed by the output. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") +} + +func TestReceiverUpdate(t *testing.T) { + var ( + consumer otelconsumer.Traces + + waitConsumerTrigger = util.NewWaitTrigger() + onTracesConsumer = func(t otelconsumer.Traces) { + consumer = t + waitConsumerTrigger.Trigger() + } + + waitTracesTrigger = util.NewWaitTrigger() + nextConsumer = &fakeconsumer.Consumer{ + ConsumeTracesFunc: func(context.Context, ptrace.Traces) error { + waitTracesTrigger.Trigger() + return nil + }, + } + ) + + te := newTestEnvironment(t, onTracesConsumer) + te.Start(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that no trace receiver was started because it's not needed by the output. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") + + te.Controller.Update(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{ + Traces: []otelcol.Consumer{nextConsumer}, + }, + }) + + // Now the trace receiver is started. + require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no traces consumer sent") + + err := consumer.ConsumeTraces(context.Background(), ptrace.NewTraces()) + require.NoError(t, err) + + require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked") + + waitConsumerTrigger = util.NewWaitTrigger() + + // Remove the trace receiver. + te.Controller.Update(fakeReceiverArgs{ + Output: &otelcol.ConsumerArguments{}, + }) + + // Check that after the update no trace receiver is started. + require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded") +} + type testEnvironment struct { t *testing.T diff --git a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy index c2f11b594b..b98eabaf2f 100644 --- a/internal/converter/internal/otelcolconvert/testdata/kafka.alloy +++ b/internal/converter/internal/otelcolconvert/testdata/kafka.alloy @@ -1,7 +1,6 @@ otelcol.receiver.kafka "default" { brokers = ["broker:9092"] protocol_version = "2.0.0" - topic = "" authentication { plaintext {