Skip to content

Commit c53ebd0

Browse files
authored
Merge pull request #1303 from iyedbennour/master
Add SCRAM authentication example.
2 parents 99ce937 + 23982ed commit c53ebd0

File tree

8 files changed

+177
-1
lines changed

8 files changed

+177
-1
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ install_errcheck:
2929
go get github.com/kisielk/errcheck
3030

3131
get:
32-
go get -t
32+
go get -t -v ./...

examples/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ In these examples, we use `github.com/Shopify/sarama` as import path. We do this
77
#### HTTP server
88

99
[http_server](./http_server) is a simple HTTP server uses both the sync producer to produce data as part of the request handling cycle, as well as the async producer to maintain an access log. It also uses the [mocks subpackage](https://godoc.org/github.com/Shopify/sarama/mocks) to test both.
10+
11+
#### SASL SCRAM Authentication
12+
[sasl_scram_authentication](./sasl_scram_authentication) is an example of how to authenticate to a Kafka cluster using SASL SCRAM-SHA-256 or SCRAM-SHA-512 mechanisms.

examples/sasl_scram_client/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
sasl_scram_client
2+

examples/sasl_scram_client/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Example commande line:
2+
3+
```./sasl_scram_client -brokers localhost:9094 -username foo -passwd a_password -topic topic_name -tls -algorithm [sha256|sha512]```
4+

examples/sasl_scram_client/main.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package main
2+
3+
import (
4+
"crypto/tls"
5+
"crypto/x509"
6+
"flag"
7+
"io/ioutil"
8+
"log"
9+
"os"
10+
"strings"
11+
12+
"github.com/Shopify/sarama"
13+
)
14+
15+
func init() {
16+
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
17+
}
18+
19+
var (
20+
brokers = flag.String("brokers", os.Getenv("KAFKA_PEERS"), "The Kafka brokers to connect to, as a comma separated list")
21+
userName = flag.String("username", "", "The SASL username")
22+
passwd = flag.String("passwd", "", "The SASL password")
23+
algorithm = flag.String("algorithm", "", "The SASL SCRAM SHA algorithm sha256 or sha512 as mechanism")
24+
topic = flag.String("topic", "default_topic", "The Kafka topic to use")
25+
certFile = flag.String("certificate", "", "The optional certificate file for client authentication")
26+
keyFile = flag.String("key", "", "The optional key file for client authentication")
27+
caFile = flag.String("ca", "", "The optional certificate authority file for TLS client authentication")
28+
verifySSL = flag.Bool("verify", false, "Optional verify ssl certificates chain")
29+
useTLS = flag.Bool("tls", false, "Use TLS to communicate with the cluster")
30+
31+
logger = log.New(os.Stdout, "[Producer] ", log.LstdFlags)
32+
)
33+
34+
func createTLSConfiguration() (t *tls.Config) {
35+
t = &tls.Config{
36+
InsecureSkipVerify: *verifySSL,
37+
}
38+
if *certFile != "" && *keyFile != "" && *caFile != "" {
39+
cert, err := tls.LoadX509KeyPair(*certFile, *keyFile)
40+
if err != nil {
41+
log.Fatal(err)
42+
}
43+
44+
caCert, err := ioutil.ReadFile(*caFile)
45+
if err != nil {
46+
log.Fatal(err)
47+
}
48+
49+
caCertPool := x509.NewCertPool()
50+
caCertPool.AppendCertsFromPEM(caCert)
51+
52+
t = &tls.Config{
53+
Certificates: []tls.Certificate{cert},
54+
RootCAs: caCertPool,
55+
InsecureSkipVerify: *verifySSL,
56+
}
57+
}
58+
return t
59+
}
60+
61+
func main() {
62+
flag.Parse()
63+
64+
if *brokers == "" {
65+
log.Fatalln("at least one brocker is required")
66+
}
67+
68+
if *userName == "" {
69+
log.Fatalln("SASL username is required")
70+
}
71+
72+
if *passwd == "" {
73+
log.Fatalln("SASL password is required")
74+
}
75+
76+
conf := sarama.NewConfig()
77+
conf.Producer.Retry.Max = 1
78+
conf.Producer.RequiredAcks = sarama.WaitForAll
79+
conf.Producer.Return.Successes = true
80+
conf.Metadata.Full = true
81+
conf.Version = sarama.V0_10_0_0
82+
conf.ClientID = "sasl_scram_client"
83+
conf.Metadata.Full = true
84+
conf.Net.SASL.Enable = true
85+
conf.Net.SASL.User = *userName
86+
conf.Net.SASL.Password = *passwd
87+
conf.Net.SASL.Handshake = true
88+
if *algorithm == "sha512" {
89+
conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA512}
90+
conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
91+
} else if *algorithm == "sha256" {
92+
conf.Net.SASL.SCRAMClient = &XDGSCRAMClient{HashGeneratorFcn: SHA256}
93+
conf.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
94+
95+
} else {
96+
log.Fatalf("invalid SHA algorithm \"%s\": can be either \"sha256\" or \"sha512\"", *algorithm)
97+
}
98+
99+
if *useTLS {
100+
conf.Net.TLS.Enable = true
101+
conf.Net.TLS.Config = createTLSConfiguration()
102+
}
103+
104+
syncProcuder, err := sarama.NewSyncProducer(strings.Split(*brokers, ","), conf)
105+
if err != nil {
106+
logger.Fatalln("failed to create producer: ", err)
107+
}
108+
partition, offset, err := syncProcuder.SendMessage(&sarama.ProducerMessage{
109+
Topic: *topic,
110+
Value: sarama.StringEncoder("test_message"),
111+
})
112+
if err != nil {
113+
logger.Fatalln("failed to send message to ", *topic, err)
114+
}
115+
logger.Printf("wrote message at partition: %d, offset: %d", partition, offset)
116+
_ = syncProcuder.Close()
117+
logger.Println("Bye now !")
118+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package main
2+
3+
import (
4+
"crypto/sha256"
5+
"crypto/sha512"
6+
"hash"
7+
8+
"github.com/xdg/scram"
9+
)
10+
11+
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
12+
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
13+
14+
type XDGSCRAMClient struct {
15+
*scram.Client
16+
*scram.ClientConversation
17+
scram.HashGeneratorFcn
18+
}
19+
20+
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
21+
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
22+
if err != nil {
23+
return err
24+
}
25+
x.ClientConversation = x.Client.NewConversation()
26+
return nil
27+
}
28+
29+
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
30+
response, err = x.ClientConversation.Step(challenge)
31+
return
32+
}
33+
34+
func (x *XDGSCRAMClient) Done() bool {
35+
return x.ClientConversation.Done()
36+
}

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@ require (
1010
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
1111
github.com/pierrec/lz4 v2.0.5+incompatible
1212
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
13+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
14+
github.com/xdg/stringprep v1.0.0 // indirect
15+
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 // indirect
16+
golang.org/x/text v0.3.0 // indirect
1317
)

go.sum

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,12 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
1616
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
1717
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
1818
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
19+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
20+
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
21+
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
22+
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
23+
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
24+
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
25+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
26+
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
27+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

0 commit comments

Comments
 (0)