Skip to content

Commit 5b6b522

Browse files
vidam-ioatoulme
andauthored
[exporter/kafka] add compression level to kafka configuration (#39647)
#### Description Add support for configuring Kafka producer compression level. Both Kafka and the Sarama library support setting compression level, but the OpenTelemetry Collector currently does not expose this option. This change adds a `compression_level` configuration field to the Kafka exporter, allowing users to control compression level explicitly. #### Link to tracking issue _No tracking issue provided._ #### Testing - Added unit tests #### Documentation - Updated configuration documentation alongside the existing `compression` field to describe the usage of `compression_level`. --------- Signed-off-by: vidam.io <[email protected]> Co-authored-by: Antoine Toulme <[email protected]>
1 parent 61f8c78 commit 5b6b522

File tree

16 files changed

+119
-7
lines changed

16 files changed

+119
-7
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: kafkaexporter
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add compression level in kafka producer.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [39772]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/kafkaexporter/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,22 @@ The following settings can be optionally configured:
8989
- `max_message_bytes` (default = 1000000) the maximum permitted size of a message in bytes
9090
- `required_acks` (default = 1) controls when a message is regarded as transmitted. https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#acks
9191
- `compression` (default = 'none') the compression used when producing messages to kafka. The options are: `none`, `gzip`, `snappy`, `lz4`, and `zstd` https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#compression-type
92+
- `compression_params`
93+
- `level` (default = -1) the compression level used when producing messages to kafka.
94+
- The following are valid combinations of `compression` and `level`
95+
- `gzip`
96+
- BestSpeed: `1`
97+
- BestCompression: `9`
98+
- DefaultCompression: `-1`
99+
- `zstd`
100+
- SpeedFastest: `1`
101+
- SpeedDefault: `3`
102+
- SpeedBetterCompression: `6`
103+
- SpeedBestCompression: `11`
104+
- `lz4`
105+
Only supports fast level
106+
- `snappy`
107+
No compression levels supported yet
92108
- `flush_max_messages` (default = 0) The maximum number of messages the producer will send in a single broker request.
93109

94110
### Supported encodings

exporter/kafkaexporter/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ require (
9090
github.com/xdg-go/scram v1.1.2 // indirect
9191
github.com/xdg-go/stringprep v1.0.4 // indirect
9292
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
93+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
9394
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
9495
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
9596
go.opentelemetry.io/collector/consumer/consumertest v0.125.1-0.20250505155216-829157cef7bb // indirect

exporter/kafkaexporter/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

extension/observer/kafkatopicsobserver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ require (
6969
github.com/xdg-go/scram v1.1.2 // indirect
7070
github.com/xdg-go/stringprep v1.0.4 // indirect
7171
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
72+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
7273
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
7374
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
7475
go.opentelemetry.io/collector/featuregate v1.31.1-0.20250505152726-56c7da210783 // indirect

extension/observer/kafkatopicsobserver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/kafka/client.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/config/configcompression"
1213

1314
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/configkafka"
1415
)
@@ -21,6 +22,13 @@ var saramaCompressionCodecs = map[string]sarama.CompressionCodec{
2122
"zstd": sarama.CompressionZSTD,
2223
}
2324

25+
func convertToSaramaCompressionLevel(p configcompression.Level) int {
26+
if p == configcompression.DefaultCompressionLevel {
27+
return sarama.CompressionLevelDefault
28+
}
29+
return int(p)
30+
}
31+
2432
var saramaInitialOffsets = map[string]int64{
2533
configkafka.EarliestOffset: sarama.OffsetOldest,
2634
configkafka.LatestOffset: sarama.OffsetNewest,
@@ -96,6 +104,7 @@ func NewSaramaSyncProducer(
96104
saramaConfig.Producer.RequiredAcks = sarama.RequiredAcks(producerConfig.RequiredAcks)
97105
saramaConfig.Producer.Timeout = producerTimeout
98106
saramaConfig.Producer.Compression = saramaCompressionCodecs[producerConfig.Compression]
107+
saramaConfig.Producer.CompressionLevel = convertToSaramaCompressionLevel(producerConfig.CompressionParams.Level)
99108
return sarama.NewSyncProducer(clientConfig.Brokers, saramaConfig)
100109
}
101110

internal/kafka/configkafka/config.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/IBM/sarama"
12+
"go.opentelemetry.io/collector/config/configcompression"
1213
"go.opentelemetry.io/collector/config/configtls"
1314
"go.opentelemetry.io/collector/confmap"
1415
)
@@ -184,6 +185,9 @@ type ProducerConfig struct {
184185
// The options are: 'none' (default), 'gzip', 'snappy', 'lz4', and 'zstd'
185186
Compression string `mapstructure:"compression"`
186187

188+
// CompressionParams defines compression parameters for the producer.
189+
CompressionParams configcompression.CompressionParams `mapstructure:"compression_params"`
190+
187191
// The maximum number of messages the producer will send in a single
188192
// broker request. Defaults to 0 for unlimited. Similar to
189193
// `queue.buffering.max.messages` in the JVM producer.
@@ -192,17 +196,26 @@ type ProducerConfig struct {
192196

193197
func NewDefaultProducerConfig() ProducerConfig {
194198
return ProducerConfig{
195-
MaxMessageBytes: 1000000,
196-
RequiredAcks: WaitForLocal,
197-
Compression: "none",
199+
MaxMessageBytes: 1000000,
200+
RequiredAcks: WaitForLocal,
201+
Compression: "none",
202+
CompressionParams: configcompression.CompressionParams{
203+
Level: configcompression.DefaultCompressionLevel,
204+
},
198205
FlushMaxMessages: 0,
199206
}
200207
}
201208

202209
func (c ProducerConfig) Validate() error {
203210
switch c.Compression {
204211
case "none", "gzip", "snappy", "lz4", "zstd":
205-
// Valid compression
212+
ct := configcompression.Type(c.Compression)
213+
if !ct.IsCompressed() {
214+
return nil
215+
}
216+
if err := ct.ValidateParams(c.CompressionParams); err != nil {
217+
return err
218+
}
206219
default:
207220
return fmt.Errorf(
208221
"compression should be one of 'none', 'gzip', 'snappy', 'lz4', or 'zstd'. configured value is %q",

internal/kafka/configkafka/config_test.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/stretchr/testify/require"
1212
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configcompression"
1314
"go.opentelemetry.io/collector/config/configtls"
1415
"go.opentelemetry.io/collector/confmap/confmaptest"
1516
"go.opentelemetry.io/collector/confmap/xconfmap"
@@ -162,12 +163,29 @@ func TestProducerConfig(t *testing.T) {
162163
},
163164
"full": {
164165
expected: ProducerConfig{
165-
MaxMessageBytes: 1,
166-
RequiredAcks: 0,
167-
Compression: "gzip",
166+
MaxMessageBytes: 1,
167+
RequiredAcks: 0,
168+
Compression: "gzip",
169+
CompressionParams: configcompression.CompressionParams{
170+
Level: 1,
171+
},
168172
FlushMaxMessages: 2,
169173
},
170174
},
175+
"default_compression_level": {
176+
expected: ProducerConfig{
177+
MaxMessageBytes: 1,
178+
RequiredAcks: 0,
179+
Compression: "zstd",
180+
CompressionParams: configcompression.CompressionParams{
181+
Level: configcompression.DefaultCompressionLevel,
182+
},
183+
FlushMaxMessages: 2,
184+
},
185+
},
186+
"invalid_compression_level": {
187+
expectedErr: `unsupported parameters {Level:-123} for compression type "gzip"`,
188+
},
171189
"required_acks_all": {
172190
expected: func() ProducerConfig {
173191
cfg := NewDefaultProducerConfig()

internal/kafka/configkafka/testdata/producer_config.yaml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,20 @@ kafka/full:
33
max_message_bytes: 1
44
required_acks: 0
55
compression: gzip
6+
compression_params:
7+
level: 1
8+
flush_max_messages: 2
9+
kafka/default_compression_level:
10+
max_message_bytes: 1
11+
required_acks: 0
12+
compression: zstd
13+
flush_max_messages: 2
14+
kafka/invalid_compression_level:
15+
max_message_bytes: 1
16+
required_acks: 0
17+
compression: gzip
18+
compression_params:
19+
level: -123
620
flush_max_messages: 2
721
kafka/required_acks_all:
822
required_acks: all

internal/kafka/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/twmb/franz-go/pkg/kfake v0.0.0-20250320172111-35ab5e5f5327
1313
github.com/xdg-go/scram v1.1.2
1414
go.opentelemetry.io/collector/component v1.31.1-0.20250505152726-56c7da210783
15+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783
1516
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783
1617
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783
1718
go.opentelemetry.io/collector/confmap v1.31.1-0.20250505152726-56c7da210783

internal/kafka/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkametricsreceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ require (
7878
github.com/xdg-go/scram v1.1.2 // indirect
7979
github.com/xdg-go/stringprep v1.0.4 // indirect
8080
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
81+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
8182
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
8283
go.opentelemetry.io/collector/config/configtls v1.31.1-0.20250505152726-56c7da210783 // indirect
8384
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250505155216-829157cef7bb // indirect

receiver/kafkametricsreceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

receiver/kafkareceiver/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ require (
101101
github.com/xdg-go/scram v1.1.2 // indirect
102102
github.com/xdg-go/stringprep v1.0.4 // indirect
103103
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
104+
go.opentelemetry.io/collector/config/configcompression v1.31.1-0.20250505152726-56c7da210783 // indirect
104105
go.opentelemetry.io/collector/config/configopaque v1.31.1-0.20250505152726-56c7da210783 // indirect
105106
go.opentelemetry.io/collector/consumer/consumererror v0.125.1-0.20250505155216-829157cef7bb // indirect
106107
go.opentelemetry.io/collector/consumer/xconsumer v0.125.1-0.20250505155216-829157cef7bb // indirect

receiver/kafkareceiver/go.sum

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)