Skip to content

Commit c127e9b

Browse files
committed
Add kerberos support
Signed-off-by: Ruben Vargas <[email protected]>
1 parent ea9ab1c commit c127e9b

File tree

3 files changed

+158
-1
lines changed

3 files changed

+158
-1
lines changed

broker.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"sync"
1313
"sync/atomic"
1414
"time"
15-
1615
metrics "github.com/rcrowley/go-metrics"
1716
)
1817

@@ -61,6 +60,7 @@ const (
6160
SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
6261
// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
6362
SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
63+
SASLTypeGSSAPI = "GSSAPI"
6464
// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
6565
// server negotiate SASL auth using opaque packets.
6666
SASLHandshakeV0 = int16(0)
@@ -844,11 +844,17 @@ func (b *Broker) authenticateViaSASL() error {
844844
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
845845
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
846846
return b.sendAndReceiveSASLSCRAMv1()
847+
case SASLTypeGSSAPI:
848+
return b.sendAndReceiveKerberos()
847849
default:
848850
return b.sendAndReceiveSASLPlainAuth()
849851
}
850852
}
851853

854+
func (b *Broker) sendAndReceiveKerberos() error{
855+
return NewGSSAPIKerberosAuthenticator(&b.conf.Net.SASL.GSSAPI).Authorize(b)
856+
}
857+
852858
func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
853859
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}
854860

config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type Config struct {
7575
// AccessTokenProvider interface docs for proper implementation
7676
// guidelines.
7777
TokenProvider AccessTokenProvider
78+
79+
GSSAPI GSSAPIConfig
7880
}
7981

8082
// KeepAlive specifies the keep-alive period for an active network connection.
@@ -520,6 +522,8 @@ func (c *Config) Validate() error {
520522
if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
521523
return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
522524
}
525+
case SASLTypeGSSAPI:
526+
break
523527
default:
524528
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`",
525529
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512)

gssapi_kerberos.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package sarama
2+
3+
import (
4+
"encoding/binary"
5+
"encoding/hex"
6+
"github.com/gokrb5/iana/keyusage"
7+
"github.com/jcmturner/gofork/encoding/asn1"
8+
"gopkg.in/jcmturner/gokrb5.v7/asn1tools"
9+
krbclient "gopkg.in/jcmturner/gokrb5.v7/client"
10+
krbconfig "gopkg.in/jcmturner/gokrb5.v7/config"
11+
"gopkg.in/jcmturner/gokrb5.v7/gssapi"
12+
"gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype"
13+
"gopkg.in/jcmturner/gokrb5.v7/keytab"
14+
"gopkg.in/jcmturner/gokrb5.v7/messages"
15+
"gopkg.in/jcmturner/gokrb5.v7/types"
16+
"io"
17+
)
18+
19+
const (
20+
TOK_ID_KRB_AP_REQ = "0100"
21+
GSS_API_GENERIC_TAG = 0x60
22+
)
23+
24+
type GSSAPIConfig struct {
25+
KeyTabPath string
26+
KerberosConfigPath string
27+
SPN string
28+
Username string
29+
Realm string
30+
}
31+
32+
type GSSAPIKerberosAuth struct {
33+
config *GSSAPIConfig
34+
client *krbclient.Client
35+
ticket messages.Ticket
36+
encryptionKey types.EncryptionKey
37+
}
38+
39+
func NewGSSAPIKerberosAuthenticator(config *GSSAPIConfig) *GSSAPIKerberosAuth {
40+
return &GSSAPIKerberosAuth{
41+
config: config,
42+
}
43+
}
44+
45+
func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) {
46+
length := len(payload)
47+
finalPackage := make([]byte, length+4) //4 byte length header + payload
48+
copy(finalPackage[4:], payload)
49+
binary.BigEndian.PutUint32(finalPackage, uint32(length))
50+
bytes, err := broker.conn.Write(finalPackage)
51+
broker.updateOutgoingCommunicationMetrics(bytes)
52+
if err != nil {
53+
Logger.Printf("Failed to send GSS-API Kerberos token: %s\n", err.Error())
54+
}
55+
}
56+
57+
func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) []byte {
58+
lengthInBytes := make([]byte, 4)
59+
io.ReadFull(broker.conn, lengthInBytes)
60+
payloadLength := binary.BigEndian.Uint32(lengthInBytes)
61+
payloadBytes := make([]byte, payloadLength) // buffer for read..
62+
io.ReadFull(broker.conn, payloadBytes) // read bytes
63+
return payloadBytes
64+
}
65+
66+
func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum(flags []int) []byte {
67+
a := make([]byte, 24)
68+
binary.LittleEndian.PutUint32(a[:4], 16)
69+
for _, i := range flags {
70+
if i == gssapi.ContextFlagDeleg {
71+
x := make([]byte, 28-len(a))
72+
a = append(a, x...)
73+
}
74+
f := binary.LittleEndian.Uint32(a[20:24])
75+
f |= uint32(i)
76+
binary.LittleEndian.PutUint32(a[20:24], f)
77+
}
78+
return a
79+
}
80+
81+
func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(client *krbclient.Client, ticket messages.Ticket, sessionKey types.EncryptionKey) []byte {
82+
var GSSAPIFlags = []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf}
83+
auth, _ := types.NewAuthenticator(client.Credentials.Domain(), client.Credentials.CName())
84+
auth.Cksum = types.Checksum{
85+
CksumType: chksumtype.GSSAPI,
86+
Checksum: krbAuth.newAuthenticatorChecksum(GSSAPIFlags),
87+
}
88+
APReq, _ := messages.NewAPReq(
89+
ticket,
90+
sessionKey,
91+
auth,
92+
)
93+
aprBytes, _ := hex.DecodeString(TOK_ID_KRB_AP_REQ)
94+
tb, _ := APReq.Marshal()
95+
aprBytes = append(aprBytes, tb...)
96+
return aprBytes
97+
}
98+
99+
func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) []byte {
100+
oidBytes, _ := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5))
101+
tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload))
102+
GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...)
103+
GSSHeader = append(GSSHeader, oidBytes...)
104+
GSSPackage := append(GSSHeader, payload...)
105+
return GSSPackage
106+
}
107+
108+
func (krbAuth *GSSAPIKerberosAuth) createKerberosClient() error {
109+
kt, _ := keytab.Load(krbAuth.config.KeyTabPath)
110+
cfg, _ := krbconfig.Load(krbAuth.config.KerberosConfigPath)
111+
krbAuth.client = krbclient.NewClientWithKeytab(krbAuth.config.Username, krbAuth.config.Realm, kt, cfg)
112+
return nil
113+
}
114+
115+
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error {
116+
117+
krbAuth.createKerberosClient()
118+
ticket, encryptionKey, err := krbAuth.client.GetServiceTicket(krbAuth.config.SPN)
119+
if err != nil {
120+
print(err)
121+
}
122+
krbAuth.ticket = ticket
123+
krbAuth.encryptionKey = encryptionKey
124+
aprBytes := krbAuth.createKrb5Token(krbAuth.client, krbAuth.ticket, krbAuth.encryptionKey)
125+
GSSPackage := krbAuth.appendGSSAPIHeader(aprBytes)
126+
krbAuth.writePackage(broker, GSSPackage)
127+
wrapBytes := krbAuth.readPackage(broker)
128+
wrapTokenReq := gssapi.WrapToken{}
129+
err = wrapTokenReq.Unmarshal(wrapBytes, true)
130+
if err != nil {
131+
Logger.Printf("Error while performing Kerberos Authentication: %s\n", err)
132+
return err
133+
}
134+
135+
// Validate response.
136+
isValid, _ := wrapTokenReq.Verify(krbAuth.encryptionKey, keyusage.GSSAPI_ACCEPTOR_SEAL)
137+
if !isValid {
138+
Logger.Printf("Invalid key")
139+
140+
}
141+
142+
// Is valid response, reply..
143+
wrapTokenResponse, _ := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encryptionKey)
144+
wrapResponseBytes, _ := wrapTokenResponse.Marshal()
145+
krbAuth.writePackage(broker, wrapResponseBytes)
146+
return nil
147+
}

0 commit comments

Comments
 (0)