Skip to content

Commit c313af1

Browse files
Ormodsparrc
authored andcommitted
kafka: Add support for using TLS authentication for the kafka output
With the advent of Kafka 0.9.0+ it is possible to set up TLS client certificate based authentication to limit access to Kafka. Four new configuration variables are specified for setting up the authentication. If they're not set the behavior stays the same as before the change. closes #541
1 parent 1388b1b commit c313af1

File tree

2 files changed

+65
-3
lines changed

2 files changed

+65
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
- AMQP SSL support. Thanks @ekini!
1212
- [#539](https://github.com/influxdata/telegraf/pull/539): Reload config on SIGHUP. Thanks @titilambert!
1313
- [#522](https://github.com/influxdata/telegraf/pull/522): Phusion passenger input plugin. Thanks @kureikain!
14+
- [#541](https://github.com/influxdata/telegraf/pull/541): Kafka output TLS cert support. Thanks @Ormod!
1415

1516
### Bugfixes
1617
- [#506](https://github.com/influxdb/telegraf/pull/506): Ping input doesn't return response time metric when timeout. Thanks @titilambert!

plugins/outputs/kafka/kafka.go

+64-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package kafka
22

33
import (
4+
"crypto/tls"
5+
"crypto/x509"
46
"errors"
57
"fmt"
6-
78
"github.com/Shopify/sarama"
89
"github.com/influxdb/influxdb/client/v2"
910
"github.com/influxdb/telegraf/plugins/outputs"
11+
"io/ioutil"
1012
)
1113

1214
type Kafka struct {
@@ -16,8 +18,17 @@ type Kafka struct {
1618
Topic string
1719
// Routing Key Tag
1820
RoutingTag string `toml:"routing_tag"`
21+
// TLS client certificate
22+
Certificate string
23+
// TLS client key
24+
Key string
25+
// TLS certificate authority
26+
CA string
27+
// Verfiy SSL certificate chain
28+
VerifySsl bool
1929

20-
producer sarama.SyncProducer
30+
tlsConfig tls.Config
31+
producer sarama.SyncProducer
2132
}
2233

2334
var sampleConfig = `
@@ -28,10 +39,60 @@ var sampleConfig = `
2839
# Telegraf tag to use as a routing key
2940
# ie, if this tag exists, it's value will be used as the routing key
3041
routing_tag = "host"
42+
43+
# Optional TLS configuration:
44+
# Client certificate
45+
certificate = ""
46+
# Client key
47+
key = ""
48+
# Certificate authority file
49+
ca = ""
50+
# Verify SSL certificate chain
51+
verify_ssl = false
3152
`
3253

54+
func createTlsConfiguration(k *Kafka) (t *tls.Config, err error) {
55+
if k.Certificate != "" && k.Key != "" && k.CA != "" {
56+
cert, err := tls.LoadX509KeyPair(k.Certificate, k.Key)
57+
if err != nil {
58+
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS client key/certificate: %s",
59+
err))
60+
}
61+
62+
caCert, err := ioutil.ReadFile(k.CA)
63+
if err != nil {
64+
return nil, errors.New(fmt.Sprintf("Cout not load Kafka TLS CA: %s",
65+
err))
66+
}
67+
68+
caCertPool := x509.NewCertPool()
69+
caCertPool.AppendCertsFromPEM(caCert)
70+
71+
t = &tls.Config{
72+
Certificates: []tls.Certificate{cert},
73+
RootCAs: caCertPool,
74+
InsecureSkipVerify: k.VerifySsl,
75+
}
76+
}
77+
// will be nil by default if nothing is provided
78+
return t, nil
79+
}
80+
3381
func (k *Kafka) Connect() error {
34-
producer, err := sarama.NewSyncProducer(k.Brokers, nil)
82+
config := sarama.NewConfig()
83+
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
84+
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
85+
tlsConfig, err := createTlsConfiguration(k)
86+
if err != nil {
87+
return err
88+
}
89+
90+
if tlsConfig != nil {
91+
config.Net.TLS.Config = tlsConfig
92+
config.Net.TLS.Enable = true
93+
}
94+
95+
producer, err := sarama.NewSyncProducer(k.Brokers, config)
3596
if err != nil {
3697
return err
3798
}

0 commit comments

Comments
 (0)