Closed
Description
Hi there!
Kafka supports the Kerberos authentication protocol. Currently, Faststream does not explicitly specify a security mechanism for GSSAPI.
Now, in order for the broker to work with Kafka servers with Kerberos authentication, you have to do this:
broker._connection_kwargs.update(
{
"security_protocol": "SASL_SSL",
"sasl_mechanism": "GSSAPI",
"ssl_context": create_ssl_context(),
},
)
I suggest creating a SASLGSSAPI class in faststream/security.py:
class SASLGSSAPI(BaseSecurity):
"""Security configuration for SASL/GSSAPI authentication.
This class defines security configuration for SASL/GSSAPI authentication.
"""
def __init__(
self,
ssl_context: Optional["SSLContext"] = None,
use_ssl: Optional[bool] = None,
) -> None:
super().__init__(
ssl_context=ssl_context,
use_ssl=use_ssl,
)
def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/GSSAPI authentication."""
return [{"gssapi": []}]
def get_schema(self) -> Dict[str, Dict[str, str]]:
"""Get the security schema for SASL/GSSAPI authentication."""
return {"gssapi": {}}
and parsing in kafka/security.py (and the same parsing in confluent/security.py):
def _parse_sasl_gssapi(security: SASLGSSAPI) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"ssl_context": security.ssl_context,
"sasl_mechanism": "GSSAPI",
}
This will allow you to specify the authentication method in this way without interfering with protected attributes:
broker = KafkaBroker(
bootstrap_servers=[
"example.ru:9092",
],
sasl_kerberos_service_name="example_service_name",
security=SASLGSSAPI(ssl_context=create_ssl_context(), use_ssl=True),
)
Well, I will also have to correct some shortcomings in specifying the sasl mechanism, for example these.
I'm ready to implement this feature.