Skip to content

Add kerberos support #1366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"encoding/binary"
"fmt"
metrics "github.com/rcrowley/go-metrics"
"io"
"net"
"sort"
Expand All @@ -12,8 +13,6 @@ import (
"sync"
"sync/atomic"
"time"

metrics "github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand Down Expand Up @@ -47,6 +46,8 @@ type Broker struct {
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram

kerberosAuthenticator GSSAPIKerberosAuth
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand All @@ -61,6 +62,7 @@ const (
SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
SASLTypeGSSAPI = "GSSAPI"
// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
// server negotiate SASL auth using opaque packets.
SASLHandshakeV0 = int16(0)
Expand Down Expand Up @@ -844,11 +846,21 @@ func (b *Broker) authenticateViaSASL() error {
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
return b.sendAndReceiveSASLSCRAMv1()
case SASLTypeGSSAPI:
return b.sendAndReceiveKerberos()
default:
return b.sendAndReceiveSASLPlainAuth()
}
}

func (b *Broker) sendAndReceiveKerberos() error {
b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
}
return b.kerberosAuthenticator.Authorize(b)
}

func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}

Expand Down
123 changes: 122 additions & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package sarama
import (
"errors"
"fmt"
"gopkg.in/jcmturner/gokrb5.v7/krberror"
"net"
"reflect"
"testing"
"time"

metrics "github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics"
)

func ExampleBroker() {
Expand Down Expand Up @@ -477,6 +478,126 @@ func TestSASLPlainAuth(t *testing.T) {
}
}

func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {

testTable := []struct {
name string
error error
mockKerberosClient bool
errorStage string
badResponse bool
badKeyChecksum bool
}{
{
name: "Kerberos authentication success",
error: nil,
mockKerberosClient: true,
},
{
name: "Kerberos login fails",
error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
"cation information was invalid - PREAUTH_FAILED"),
mockKerberosClient: true,
errorStage: "login",
},
{
name: "Kerberos service ticket fails",
error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
"cation information was invalid - PREAUTH_FAILED"),
mockKerberosClient: true,
errorStage: "service_ticket",
},
{
name: "Kerberos client creation fails",
error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
},
{
name: "Bad server response, unmarshall key error",
error: errors.New("bytes shorter than header length"),
badResponse: true,
mockKerberosClient: true,
},
{
name: "Bad token checksum",
error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
badResponse: false,
badKeyChecksum: true,
mockKerberosClient: true,
},
}
for i, test := range testTable {
mockBroker := NewMockBroker(t, 0)
// broker executes SASL requests against mockBroker

mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
return nil
})
broker := NewBroker(mockBroker.Addr())
broker.requestRate = metrics.NilMeter{}
broker.outgoingByteRate = metrics.NilMeter{}
broker.incomingByteRate = metrics.NilMeter{}
broker.requestSize = metrics.NilHistogram{}
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
conf := NewConfig()
conf.Net.SASL.Mechanism = SASLTypeGSSAPI
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
conf.Net.SASL.GSSAPI.Username = "kafka"
conf.Net.SASL.GSSAPI.Password = "kafka"
conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
broker.conf = conf
broker.conf.Version = V1_0_0_0
dialer := net.Dialer{
Timeout: conf.Net.DialTimeout,
KeepAlive: conf.Net.KeepAlive,
LocalAddr: conf.Net.LocalAddr,
}

conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())

if err != nil {
t.Fatal(err)
}

gssapiHandler := KafkaGSSAPIHandler{
client: &MockKerberosClient{},
badResponse: test.badResponse,
badKeyChecksum: test.badKeyChecksum,
}
mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
broker.conn = conn
if test.mockKerberosClient {
broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
return &MockKerberosClient{
mockError: test.error,
errorStage: test.errorStage,
}, nil
}
} else {
broker.kerberosAuthenticator.NewKerberosClientFunc = nil
}

err = broker.authenticateViaSASL()

if err != nil && test.error != nil {
if test.error.Error() != err.Error() {
t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
}
} else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
}

mockBroker.Close()
}

}

func TestBuildClientInitialResponse(t *testing.T) {

testTable := []struct {
Expand Down
33 changes: 31 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Config struct {
// AccessTokenProvider interface docs for proper implementation
// guidelines.
TokenProvider AccessTokenProvider

GSSAPI GSSAPIConfig
}

// KeepAlive specifies the keep-alive period for an active network connection.
Expand Down Expand Up @@ -527,9 +529,36 @@ func (c *Config) Validate() error {
if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
}
case SASLTypeGSSAPI:
if c.Net.SASL.GSSAPI.ServiceName == "" {
return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
}

if c.Net.SASL.GSSAPI.AuthType == KRB5_USER_AUTH {
if c.Net.SASL.GSSAPI.Password == "" {
return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
}
} else if c.Net.SASL.GSSAPI.AuthType == KRB5_KEYTAB_AUTH {
if c.Net.SASL.GSSAPI.KeyTabPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
}
} else {
return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH")
}
if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Username == "" {
return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Realm == "" {
return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
}
default:
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`",
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512)
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
return ConfigurationError(msg)
}
}
Expand Down
82 changes: 81 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestNetConfigValidates(t *testing.T) {
cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
},
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"},
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
{"SASL.Mechanism.OAUTHBEARER - Missing token provider",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
Expand All @@ -117,6 +117,86 @@ func TestNetConfigValidates(t *testing.T) {
cfg.Net.SASL.Password = "stong_password"
},
"A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
},
"Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"

},
"Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
}

for i, test := range tests {
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/stretchr/testify v1.3.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.2.3
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
Expand All @@ -37,3 +41,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
Loading