Skip to content

Commit bb74e49

Browse files
authored
Merge pull request #1538 from slaunay/feature/producer-perf-tls
Support TLS protocol in kafka-producer-performance
2 parents afedeca + 3e595ba commit bb74e49

File tree

1 file changed

+74
-4
lines changed
  • tools/kafka-producer-performance

1 file changed

+74
-4
lines changed

tools/kafka-producer-performance/main.go

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,19 @@ package main
33
import (
44
"context"
55
"crypto/rand"
6+
"crypto/x509"
67
"flag"
78
"fmt"
89
"io"
10+
"io/ioutil"
11+
"log"
912
"os"
1013
"strings"
1114
gosync "sync"
1215
"time"
1316

1417
"github.com/Shopify/sarama"
18+
"github.com/Shopify/sarama/tools/tls"
1519
metrics "github.com/rcrowley/go-metrics"
1620
)
1721

@@ -36,6 +40,31 @@ var (
3640
"",
3741
"REQUIRED: A comma separated list of broker addresses.",
3842
)
43+
securityProtocol = flag.String(
44+
"security-protocol",
45+
"PLAINTEXT",
46+
"The name of the security protocol to talk to Kafka (PLAINTEXT, SSL) (default: PLAINTEXT).",
47+
)
48+
tlsRootCACerts = flag.String(
49+
"tls-ca-certs",
50+
"",
51+
"The path to a file that contains a set of root certificate authorities in PEM format "+
52+
"to trust when verifying broker certificates when -security-protocol=SSL "+
53+
"(leave empty to use the host's root CA set).",
54+
)
55+
tlsClientCert = flag.String(
56+
"tls-client-cert",
57+
"",
58+
"The path to a file that contains the client certificate to send to the broker "+
59+
"in PEM format if client authentication is required when -security-protocol=SSL "+
60+
"(leave empty to disable client authentication).",
61+
)
62+
tlsClientKey = flag.String(
63+
"tls-client-key",
64+
"",
65+
"The path to a file that contains the client private key linked to the client certificate "+
66+
"in PEM format when -security-protocol=SSL (REQUIRED if tls-client-cert is provided).",
67+
)
3968
topic = flag.String(
4069
"topic",
4170
"",
@@ -126,6 +155,11 @@ var (
126155
"0.8.2.0",
127156
"The assumed version of Kafka.",
128157
)
158+
verbose = flag.Bool(
159+
"verbose",
160+
false,
161+
"Turn on sarama logging to stderr",
162+
)
129163
)
130164

131165
func parseCompression(scheme string) sarama.CompressionCodec {
@@ -205,6 +239,12 @@ func main() {
205239
if *routines < 1 || *routines > *messageLoad {
206240
printUsageErrorAndExit("-routines must be greater than 0 and less than or equal to -message-load")
207241
}
242+
if *securityProtocol != "PLAINTEXT" && *securityProtocol != "SSL" {
243+
printUsageErrorAndExit(fmt.Sprintf("-security-protocol %q is not supported", *securityProtocol))
244+
}
245+
if *verbose {
246+
sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)
247+
}
208248

209249
config := sarama.NewConfig()
210250

@@ -222,6 +262,30 @@ func main() {
222262
config.ChannelBufferSize = *channelBufferSize
223263
config.Version = parseVersion(*version)
224264

265+
if *securityProtocol == "SSL" {
266+
tlsConfig, err := tls.NewConfig(*tlsClientCert, *tlsClientKey)
267+
if err != nil {
268+
printErrorAndExit(69, "failed to load client certificate from: %s and private key from: %s: %v",
269+
*tlsClientCert, *tlsClientKey, err)
270+
}
271+
272+
if *tlsRootCACerts != "" {
273+
rootCAsBytes, err := ioutil.ReadFile(*tlsRootCACerts)
274+
if err != nil {
275+
printErrorAndExit(69, "failed to read root CA certificates: %v", err)
276+
}
277+
certPool := x509.NewCertPool()
278+
if !certPool.AppendCertsFromPEM(rootCAsBytes) {
279+
printErrorAndExit(69, "failed to load root CA certificates from file: %s", *tlsRootCACerts)
280+
}
281+
// Use specific root CA set vs the host's set
282+
tlsConfig.RootCAs = certPool
283+
}
284+
285+
config.Net.TLS.Enable = true
286+
config.Net.TLS.Config = tlsConfig
287+
}
288+
225289
if err := config.Validate(); err != nil {
226290
printErrorAndExit(69, "Invalid configuration: %s", err)
227291
}
@@ -363,18 +427,24 @@ func runSyncProducer(topic string, partition, messageLoad, messageSize, routines
363427
}
364428

365429
func printMetrics(w io.Writer, r metrics.Registry) {
366-
if r.Get("record-send-rate") == nil || r.Get("request-latency-in-ms") == nil {
430+
recordSendRateMetric := r.Get("record-send-rate")
431+
requestLatencyMetric := r.Get("request-latency-in-ms")
432+
outgoingByteRateMetric := r.Get("outgoing-byte-rate")
433+
434+
if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil {
367435
return
368436
}
369-
recordSendRate := r.Get("record-send-rate").(metrics.Meter).Snapshot()
370-
requestLatency := r.Get("request-latency-in-ms").(metrics.Histogram).Snapshot()
437+
recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
438+
requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
371439
requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
372-
fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MB/sec), "+
440+
outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
441+
fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
373442
"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
374443
"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
375444
recordSendRate.Count(),
376445
recordSendRate.RateMean(),
377446
recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
447+
outgoingByteRate.RateMean()/1024/1024,
378448
requestLatency.Mean(),
379449
requestLatency.StdDev(),
380450
requestLatencyPercentiles[0],

0 commit comments

Comments
 (0)