Skip to content

Commit 1d89026

Browse files
committed
Add kerberos support
Signed-off-by: Ruben Vargas <[email protected]>
1 parent ea9ab1c commit 1d89026

File tree

6 files changed

+282
-5
lines changed

6 files changed

+282
-5
lines changed

broker.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"crypto/tls"
55
"encoding/binary"
66
"fmt"
7+
metrics "github.com/rcrowley/go-metrics"
78
"io"
89
"net"
910
"sort"
@@ -12,8 +13,6 @@ import (
1213
"sync"
1314
"sync/atomic"
1415
"time"
15-
16-
metrics "github.com/rcrowley/go-metrics"
1716
)
1817

1918
// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
@@ -61,6 +60,7 @@ const (
6160
SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
6261
// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
6362
SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
63+
SASLTypeGSSAPI = "GSSAPI"
6464
// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
6565
// server negotiate SASL auth using opaque packets.
6666
SASLHandshakeV0 = int16(0)
@@ -844,11 +844,17 @@ func (b *Broker) authenticateViaSASL() error {
844844
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
845845
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
846846
return b.sendAndReceiveSASLSCRAMv1()
847+
case SASLTypeGSSAPI:
848+
return b.sendAndReceiveKerberos()
847849
default:
848850
return b.sendAndReceiveSASLPlainAuth()
849851
}
850852
}
851853

854+
func (b *Broker) sendAndReceiveKerberos() error {
855+
return NewGSSAPIKerberosAuthenticator(&b.conf.Net.SASL.GSSAPI).Authorize(b)
856+
}
857+
852858
func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
853859
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
854860

config.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type Config struct {
7575
// AccessTokenProvider interface docs for proper implementation
7676
// guidelines.
7777
TokenProvider AccessTokenProvider
78+
79+
GSSAPI GSSAPIConfig
7880
}
7981

8082
// KeepAlive specifies the keep-alive period for an active network connection.
@@ -520,9 +522,25 @@ func (c *Config) Validate() error {
520522
if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
521523
return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
522524
}
525+
case SASLTypeGSSAPI:
526+
if c.Net.SASL.GSSAPI.ServiceName == "" {
527+
return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
528+
}
529+
if c.Net.SASL.GSSAPI.KeyTabPath == "" {
530+
return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used")
531+
}
532+
if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
533+
return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
534+
}
535+
if c.Net.SASL.GSSAPI.Username == "" {
536+
return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
537+
}
538+
if c.Net.SASL.GSSAPI.Realm == "" {
539+
return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
540+
}
523541
default:
524-
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`",
525-
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512)
542+
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
543+
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
526544
return ConfigurationError(msg)
527545
}
528546
}

config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func TestNetConfigValidates(t *testing.T) {
9191
cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
9292
cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
9393
},
94-
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"},
94+
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
9595
{"SASL.Mechanism.OAUTHBEARER - Missing token provider",
9696
func(cfg *Config) {
9797
cfg.Net.SASL.Enable = true

go.mod

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,17 @@ require (
88
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
99
github.com/eapache/queue v1.1.0
1010
github.com/golang/snappy v0.0.1 // indirect
11+
github.com/hashicorp/go-uuid v1.0.1 // indirect
12+
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
1113
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41
1214
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
1315
github.com/stretchr/testify v1.3.0
1416
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
1517
github.com/xdg/stringprep v1.0.0 // indirect
1618
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
1719
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
20+
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
21+
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
22+
gopkg.in/jcmturner/gokrb5.v7 v7.2.3
23+
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
1824
)

go.sum

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
1313
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
1414
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
1515
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
16+
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
17+
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
18+
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
19+
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
1620
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
1721
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
1822
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
@@ -37,3 +41,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
3741
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
3842
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
3943
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
44+
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
45+
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
46+
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
47+
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
48+
gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
49+
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
50+
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
51+
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=

gssapi_kerberos.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
package sarama
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/hex"
6+
"fmt"
7+
"github.com/jcmturner/gofork/encoding/asn1"
8+
"gopkg.in/jcmturner/gokrb5.v7/asn1tools"
9+
krbclient "gopkg.in/jcmturner/gokrb5.v7/client"
10+
krbconfig "gopkg.in/jcmturner/gokrb5.v7/config"
11+
"gopkg.in/jcmturner/gokrb5.v7/gssapi"
12+
"gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
13+
"gopkg.in/jcmturner/gokrb5.v7/iana/keyusage"
14+
"gopkg.in/jcmturner/gokrb5.v7/keytab"
15+
"gopkg.in/jcmturner/gokrb5.v7/messages"
16+
"gopkg.in/jcmturner/gokrb5.v7/types"
17+
"io"
18+
"strings"
19+
)
20+
21+
const (
22+
TOK_ID_KRB_AP_REQ = "0100"
23+
GSS_API_GENERIC_TAG = 0x60
24+
)
25+
26+
type GSSAPIConfig struct {
27+
KeyTabPath string
28+
KerberosConfigPath string
29+
ServiceName string
30+
Username string
31+
Realm string
32+
}
33+
34+
type GSSAPIKerberosAuth struct {
35+
config *GSSAPIConfig
36+
client *krbclient.Client
37+
ticket messages.Ticket
38+
encryptionKey types.EncryptionKey
39+
}
40+
41+
/*
42+
*
43+
* Appends length in big endian before payload, and send it to kafka
44+
*
45+
*/
46+
47+
func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) error {
48+
length := len(payload)
49+
finalPackage := make([]byte, length+4) //4 byte length header + payload
50+
copy(finalPackage[4:], payload)
51+
binary.BigEndian.PutUint32(finalPackage, uint32(length))
52+
bytes, err := broker.conn.Write(finalPackage)
53+
broker.updateOutgoingCommunicationMetrics(bytes)
54+
if err != nil {
55+
return err
56+
}
57+
return nil
58+
}
59+
60+
/*
61+
*
62+
* Read length (4 bytes) and then read the payload
63+
*
64+
*/
65+
66+
func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, error) {
67+
lengthInBytes := make([]byte, 4)
68+
_, err := io.ReadFull(broker.conn, lengthInBytes)
69+
if err != nil {
70+
return nil, err
71+
}
72+
payloadLength := binary.BigEndian.Uint32(lengthInBytes)
73+
payloadBytes := make([]byte, payloadLength) // buffer for read..
74+
_, err = io.ReadFull(broker.conn, payloadBytes) // read bytes
75+
if err != nil {
76+
return payloadBytes, err
77+
}
78+
return payloadBytes, nil
79+
}
80+
81+
func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum(flags []int) []byte {
82+
a := make([]byte, 24)
83+
binary.LittleEndian.PutUint32(a[:4], 16)
84+
for _, i := range flags {
85+
if i == gssapi.ContextFlagDeleg {
86+
x := make([]byte, 28-len(a))
87+
a = append(a, x...)
88+
}
89+
f := binary.LittleEndian.Uint32(a[20:24])
90+
f |= uint32(i)
91+
binary.LittleEndian.PutUint32(a[20:24], f)
92+
}
93+
return a
94+
}
95+
96+
/*
97+
*
98+
* Construct Kerberos AP_REQ package, conforming to RFC-4120
99+
* https://tools.ietf.org/html/rfc4120#page-84
100+
*
101+
*/
102+
func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(client *krbclient.Client, ticket messages.Ticket, sessionKey types.EncryptionKey) []byte {
103+
var GSSAPIFlags = []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
104+
auth, _ := types.NewAuthenticator(client.Credentials.Domain(), client.Credentials.CName())
105+
auth.Cksum = types.Checksum{
106+
CksumType: chksumtype.GSSAPI,
107+
Checksum: krbAuth.newAuthenticatorChecksum(GSSAPIFlags),
108+
}
109+
APReq, _ := messages.NewAPReq(
110+
ticket,
111+
sessionKey,
112+
auth,
113+
)
114+
aprBytes, _ := hex.DecodeString(TOK_ID_KRB_AP_REQ)
115+
tb, _ := APReq.Marshal()
116+
aprBytes = append(aprBytes, tb...)
117+
return aprBytes
118+
}
119+
120+
/*
121+
*
122+
* Append the GSS-API header to the payload, conforming to RFC-2743
123+
* Section 3.1, Mechanism-Independent Token Format
124+
*
125+
* https://tools.ietf.org/html/rfc2743#page-81
126+
*
127+
* GSSAPIHeader + <specific mechanism payload>
128+
*
129+
*/
130+
func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) []byte {
131+
oidBytes, _ := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
132+
tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
133+
GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
134+
GSSHeader = append(GSSHeader, oidBytes...)
135+
GSSPackage := append(GSSHeader, payload...)
136+
return GSSPackage
137+
}
138+
139+
/*
140+
*
141+
* Create kerberos client used to obtain TGT and TGS tokens
142+
* used gokrb5 library, which is a pure go kerberos client with
143+
* some GSS-API capabilities, and SPNEGO support. Kafka does not use SPNEGO
144+
* it uses pure Kerberos 5 solution (RFC-4121 and RFC-4120).
145+
*
146+
*/
147+
func (krbAuth *GSSAPIKerberosAuth) createKerberosClient() error {
148+
kt, err := keytab.Load(krbAuth.config.KeyTabPath)
149+
if err != nil {
150+
return err
151+
}
152+
cfg, err := krbconfig.Load(krbAuth.config.KerberosConfigPath)
153+
if err != nil {
154+
return err
155+
}
156+
krbAuth.client = krbclient.NewClientWithKeytab(krbAuth.config.Username, krbAuth.config.Realm, kt, cfg)
157+
return nil
158+
}
159+
160+
/* This does the handshake for authorization */
161+
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
162+
163+
err := krbAuth.createKerberosClient()
164+
if err != nil {
165+
Logger.Printf("Kerberos client error: %s", err)
166+
}
167+
// Construct SPN using serviceName and host
168+
// SPN format: <SERVICE>/<FQDN>
169+
170+
host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part
171+
spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host)
172+
173+
ticket, encryptionKey, err := krbAuth.client.GetServiceTicket(spn)
174+
if err != nil {
175+
Logger.Printf("Error getting Kerberos service ticket : %s", err)
176+
return err
177+
}
178+
179+
krbAuth.ticket = ticket
180+
krbAuth.encryptionKey = encryptionKey
181+
182+
aprBytes := krbAuth.createKrb5Token(krbAuth.client, krbAuth.ticket, krbAuth.encryptionKey)
183+
GSSPackage := krbAuth.appendGSSAPIHeader(aprBytes)
184+
err = krbAuth.writePackage(broker, GSSPackage)
185+
if err != nil {
186+
Logger.Printf("Error while performing GSS-API Kerberos Authentication: %s\n", err)
187+
return err
188+
}
189+
190+
wrapBytes, err := krbAuth.readPackage(broker)
191+
if err != nil {
192+
Logger.Printf("Error while performing GSS-API Kerberos Authentication: %s\n", err)
193+
return err
194+
}
195+
196+
wrapTokenReq := gssapi.WrapToken{}
197+
err = wrapTokenReq.Unmarshal(wrapBytes, true)
198+
if err != nil {
199+
Logger.Printf("Error while performing GSS-API Kerberos Authentication: %s\n", err)
200+
return err
201+
}
202+
203+
// Validate response.
204+
isValid, err := wrapTokenReq.Verify(krbAuth.encryptionKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
205+
if !isValid {
206+
Logger.Printf("Error while performing GSS-API-Kerberos Authentication: %s", err)
207+
return err
208+
}
209+
210+
// Reply to server
211+
wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encryptionKey)
212+
if err != nil {
213+
Logger.Printf("Error while performing GSS-API-Kerberos Authentication: %s", err)
214+
return err
215+
}
216+
wrapResponseBytes, err := wrapTokenResponse.Marshal()
217+
if err != nil {
218+
Logger.Printf("Error while performing GSS-API-Kerberos Authentication: %s", err)
219+
return err
220+
}
221+
222+
err = krbAuth.writePackage(broker, wrapResponseBytes)
223+
if err != nil {
224+
Logger.Printf("Error while performing GSS-API Kerberos Authentication: %s\n", err)
225+
return err
226+
}
227+
// If we reach this, we were already authenticated to kafka broker using kerberos.
228+
return nil
229+
}
230+
231+
func NewGSSAPIKerberosAuthenticator(config *GSSAPIConfig) *GSSAPIKerberosAuth {
232+
return &GSSAPIKerberosAuth{
233+
config: config,
234+
}
235+
}

0 commit comments

Comments
 (0)