-
Notifications
You must be signed in to change notification settings - Fork 4
First commit of the IdP Metadata Attribute Store #8
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
module: idp_metadata_attribute_store.IdpMetadataAttributeStore | ||
name: IdpMetadataAttributeStore | ||
config: | ||
default: | ||
display_name: | ||
# SATOSA internal attribute name to use | ||
internal_attribute_name: idpdisplayname | ||
# Language preference. 'en' or English is the default | ||
# if not specified. | ||
lang: en | ||
entity_id: | ||
internal_attribute_name: idpentityid | ||
organization_name: | ||
internal_attribute_name: idporgname | ||
organization_display_name: | ||
internal_attribute_name: idporgdisplayname | ||
|
||
# Configuration may also be done per-IdP with any | ||
# missing parameters taken from the default if any. | ||
# The configuration key is the entityID of the IdP. | ||
# | ||
# For example: | ||
https://idp.myorg.edu/idp/shibboleth: | ||
display_name: | ||
internal_attribute_name: othername | ||
lang: jp | ||
# The microservice may be configured to ignore a particular IdP. | ||
https://login.other.org.edu/idp/shibboleth: | ||
ignore: true | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,239 @@ | ||
""" | ||
SATOSA microservice that includes in the assertion | ||
attributes taken from SAML metadata about the SAML | ||
IdP used for authentication. | ||
|
||
The attributes that may be asserted from the SAML | ||
metadata for the IdP include | ||
|
||
<mdui:DisplayName> | ||
<OrganiationName> | ||
<OrganizationDisplayName> | ||
|
||
A typical configuration would be | ||
|
||
module: idp_metadata_attribute_store.IdpMetadataAttributeStore | ||
name: IdpMetadataAttributeStore | ||
config: | ||
default: | ||
display_name: | ||
# SATOSA internal attribute name to use | ||
internal_attribute_name: idpdisplayname | ||
# Language preference with 'en' or English as default | ||
lang: en | ||
entity_id: | ||
internal_attribute_name: idpentityid | ||
organization_name: | ||
internal_attribute_name: idporgname | ||
lang: en | ||
organization_display_name: | ||
internal_attribute_name: idporgdisplayname | ||
lang: en | ||
|
||
# Configuration may also be done per-IdP with any | ||
# missing parameters taken from the default if any. | ||
# The configuration key is the entityID of the IdP. | ||
# | ||
# For example: | ||
https://login.myorg.edu/idp/shibboleth: | ||
display_name: | ||
internal_attribute_name: othername | ||
lang: jp | ||
# The microservice may be configured to ignore a particular IdP. | ||
https://login.other.org.edu/idp/shibboleth: | ||
ignore: true | ||
""" | ||
|
||
import satosa.micro_services.base | ||
from satosa.logging_util import satosa_logging | ||
from satosa.exception import SATOSAError | ||
from satosa.context import Context | ||
|
||
import copy | ||
import logging | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
class IdpMetadataAttributeStoreError(SATOSAError): | ||
""" | ||
LDAP attribute store error | ||
""" | ||
pass | ||
|
||
class IdpMetadataAttributeStore(satosa.micro_services.base.ResponseMicroService): | ||
""" | ||
Use the metadata store attached to the proxy SP in the context | ||
to lookup metadata about the IdP entity making the assertion | ||
and include metadata details as attributes in the assertion sent | ||
to the SP that made the request. | ||
""" | ||
|
||
config_defaults = { 'ignore' : False } | ||
|
||
def __init__(self, config, *args, **kwargs): | ||
super().__init__(*args, **kwargs) | ||
|
||
if 'default' in config and "" in config: | ||
msg = """Use either 'default' or "" in config but not both""" | ||
satosa_logging(logger, logging.ERROR, msg, None) | ||
raise IdpMetadataAttributeStoreError(msg) | ||
|
||
if "" in config: | ||
config['default'] = config.pop("") | ||
|
||
if 'default' not in config: | ||
msg = "No default configuration is present" | ||
satosa_logging(logger, logging.ERROR, msg, None) | ||
raise IdpMetadataAttributeStoreError(msg) | ||
|
||
self.config = {} | ||
|
||
# Process the default configuration first then any per-IdP overrides. | ||
idp_list = ['default'] | ||
idp_list.extend([ key for key in config.keys() if key != 'default' ]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does 'default' have to be first? The following seems to be what you need. idp_list = config.keys()
id_default = idp_list.pop('default', {})
self.config_defaults.update(idp_default) |
||
|
||
for idp in idp_list: | ||
if not isinstance(config[idp], dict): | ||
msg = "Configuration value for {} must be a dictionary" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This format string is missing a parameter. I suppose you want: msg = "Configuration for {} must be a dictionary".format(idp) |
||
satosa_logging(logger, logging.ERROR, msg, None) | ||
raise IdpMetadataAttributeStoreError(msg) | ||
|
||
# Initialize configuration using module defaults then update | ||
# with configuration defaults and then per-IdP overrides. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can precompute this, as |
||
idp_config = copy.deepcopy(IdpMetadataAttributeStore.config_defaults) | ||
if 'default' in self.config: | ||
idp_config.update(self.config['default']) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This if 'default' in self.config: should always by But, if you follow my comments, you can replace these lines with: idp_config = copy.deepcopy(self.config_defaults)
idp_config.update(config[idp])
|
||
idp_config.update(config[idp]) | ||
|
||
self.config[idp] = idp_config | ||
|
||
satosa_logging(logger, logging.INFO, "IdP Metadata Attribute Store microservice initialized", None) | ||
|
||
def _first_lang_element_text(self, elements, lang='en'): | ||
""" | ||
Loop over the list representing XML elements that contain text and find | ||
the first text value for the input lang where 'en' or English is the | ||
default lang. | ||
|
||
Each item in the list is a dictionary with keys | ||
|
||
__class__ | ||
lang | ||
text | ||
|
||
as expected from the metadata returned for an entity by the MetadataStore | ||
class from pysaml2. | ||
|
||
If no element has the input lang then return the text from the first | ||
element. | ||
|
||
If no element has text then return an empty string. | ||
""" | ||
for e in elements: | ||
if lang in e: | ||
if 'text' in e: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is no reason for this to be nested, I would write it as if lang in e and 'text' in e: or (maybe) if all(k in e.keys() for k in [lang, 'text']): |
||
return e['text'] | ||
|
||
for e in elements: | ||
if 'text' in e: | ||
return e['text'] | ||
|
||
return '' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, now I see what're trying to do here, according to the comment. I guess that's why the previous loop was nested. first_text_lang = None
first_text = None
fallback = ''
for el in elements:
if 'text' in el.keys():
if first_text is None:
first_text = el['text']
if first_text_lang is None and lang in el.keys(): # should this be: lang is el.get('lang', None) ?
first_text_lang = el['text']
break
return first_text_lang or first_text or fallback PS: Note my comment there. |
||
|
||
def process(self, context, data): | ||
""" | ||
Default interface for microservices. Process the input data for | ||
the input context. | ||
""" | ||
self.context = context | ||
|
||
# Find the entityID for the IdP that issued the assertion. | ||
try: | ||
idp_entity_id = data.to_dict()['auth_info']['issuer'] | ||
except KeyError as err: | ||
satosa_logging(logger, logging.ERROR, "Unable to determine the entityID for the IdP issuer", context.state) | ||
return super().process(context, data) | ||
|
||
# Get the configuration for the IdP. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the comments above (allowing not having a default configuration), I would do the following: config = self.config.get(idp_entity_id) or self.config.get('default', {})
satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state) |
||
if idp_entity_id in self.config.keys(): | ||
config = self.config[idp_entity_id] | ||
else: | ||
config = self.config['default'] | ||
|
||
satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state) | ||
|
||
# Log the entityID of the authenticating IdP. | ||
satosa_logging(logger, logging.INFO, "entityID for authenticating IdP is {}".format(idp_entity_id), context.state) | ||
|
||
# Ignore this IdP if so configured. | ||
if config['ignore']: | ||
satosa_logging(logger, logging.INFO, "Ignoring IdP {}".format(idp_entity_id), context.state) | ||
return super().process(context, data) | ||
|
||
# Set the entityID attribute if so configured. | ||
if 'entity_id' in config: | ||
data.attributes[config['entity_id']['internal_attribute_name']] = idp_entity_id | ||
|
||
# Get the metadata store the SP for the proxy is using. This | ||
# will be an instance of the class MetadataStore from mdstore.py | ||
# in pysaml2. | ||
metadata_store = context.get_decoration(Context.KEY_BACKEND_METADATA_STORE) | ||
|
||
# Get the metadata for the IdP. | ||
try: | ||
metadata = metadata_store[idp_entity_id] | ||
except Exception as err: | ||
satosa_logging(logger, logging.ERROR, "Unable to retrieve metadata for IdP {}".format(idp_entity_id), context.state) | ||
return super().process(context, data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a recurring pattern: try:
something()
except:
return super().process(context, data) This can be handled better by changing class IdpMetadataAttributeStoreWarning(SATOSAError):
pass
def process(..):
try:
self._process(..)
except IdpMetadataAttributeStoreWarning as ex:
satosa_logging(logger, logging.WARN, str(ex), context.state)
return super().process(context, data)
def _process(..):
# all the code previously on process() but with modified parts as such:
try:
something()
except SomeError as err:
msg_warn = "something went wrong, but it's ok"
raise IdpMetadataAttributeStoreWarning(msg_warn) from err Errors of type |
||
|
||
satosa_logging(logger, logging.DEBUG, "Metadata for IdP {} is {}".format(idp_entity_id, metadata), context.state) | ||
|
||
# Find the mdui:DisplayName for the IdP if so configured. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would split the processing of the attributes into different classes. This may be a bit of over-engineering but it allows to extend the mechanism by only creating a new class with just the appropriate find handler and mapping it to the factory. It also makes code reusable. from abc import ABC
# abstract class acts like an interface
class GenericAttribute(ABC): # not sure about the name
@abstractmethod
def find(self, metadata, lang, *argv):
raise NotImplementedError
# organization_display_name and organization_name use the same mechanism for find()
class GenericName(GenericAttribute, ABC):
def find(self, metadata, lang, *argv):
org_display_name_elements = metadata['organization'][self.attr_name]
value = self._first_lang_element_text(org_display_name_elements, lang)
return value
class OrganizationDisplayName(GenericName):
attr_name = 'organization_display_name'
class OrganizationName(GenericName):
attr_name = 'organization_name'
class DisplayName(GenericAttribute):
attr_name = 'display_name'
def find(self, metadata, lang, *argv):
extensions = metadata['idpsso_descriptor'][0]['extensions']['extension_elements']
for e in extensions:
if e['__class__'] == 'urn:oasis:names:tc:SAML:metadata:ui&UIInfo':
display_name_elements = e[self.attr_name]
value = self._first_lang_element_text(display_name_elements, lang)
break
return value
class AttributeFactory:
factory = {
'organization_display_name': OrganizationDisplayName,
'organization_name': OrganizationName,
'display_name': DisplayName,
}
def create(attr_type):
try:
attr_class = factory[attr_type]
except KeyError as err:
msg_err = "No handler for attribute: {}".format(attr_type)
raise IdpMetadataAttributeStoreError(msg_err) from err
else:
return attr_class()
def process(..):
...
for attr in config.keys():
attribute_handler = AttributeFactory.create(attr)
value = attribute_handler.find(metadata, lang)
if value is not None:
data.attributes[config[attr]['internal_attribute_name']] = value
msg_dbg = "attribute {} is {}".format(attr, value)
satosa_logging(logger, logging.DEBUG, msg_dbg, context.state) |
||
if 'display_name' in config: | ||
lang = config['display_name'].get('lang', 'en') | ||
try: | ||
# We assume there is only one IDPSSODescriptor in the IdP metadata. | ||
extensions = metadata['idpsso_descriptor'][0]['extensions']['extension_elements'] | ||
for e in extensions: | ||
if e['__class__'] == 'urn:oasis:names:tc:SAML:metadata:ui&UIInfo': | ||
display_name_elements = e['display_name'] | ||
display_name = self._first_lang_element_text(display_name_elements, lang) | ||
break | ||
|
||
if display_name: | ||
satosa_logging(logger, logging.DEBUG, "display_name is {}".format(display_name), context.state) | ||
data.attributes[config['display_name']['internal_attribute_name']] = display_name | ||
|
||
except Exception as err: | ||
satosa_logging(logger, logging.WARN, "Unable to determine display name for {}".format(idp_entity_id), context.state) | ||
|
||
# Find the OrganizationDisplayName for the IdP if so configured. | ||
if 'organization_display_name' in config: | ||
lang = config['organization_display_name'].get('lang', 'en') | ||
try: | ||
org_display_name_elements = metadata['organization']['organization_display_name'] | ||
organization_display_name = self._first_lang_element_text(org_display_name_elements, lang) | ||
|
||
if organization_display_name: | ||
satosa_logging(logger, logging.DEBUG, "organization_display_name is {}".format(organization_display_name), context.state) | ||
data.attributes[config['organization_display_name']['internal_attribute_name']] = organization_display_name | ||
|
||
except Exception as err: | ||
satosa_logging(logger, logging.WARN, "Unable to determine organization display name for {}".format(idp_entity_id), context.state) | ||
|
||
# Find the OrganizationName for the IdP if so configured. | ||
if 'organization_name' in config: | ||
lang = config['organization_name'].get('lang', 'en') | ||
try: | ||
org_name_elements = metadata['organization']['organization_name'] | ||
organization_name = self._first_lang_element_text(org_name_elements, lang) | ||
|
||
if organization_name: | ||
satosa_logging(logger, logging.DEBUG, "organization_name is {}".format(organization_name), context.state) | ||
data.attributes[config['organization_name']['internal_attribute_name']] = organization_name | ||
|
||
except Exception as err: | ||
satosa_logging(logger, logging.WARN, "Unable to determine organization display name for {}".format(idp_entity_id), context.state) | ||
|
||
satosa_logging(logger, logging.DEBUG, "Returning data.attributes {}".format(str(data.attributes)), context.state) | ||
return super().process(context, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if this should be an error. I would allow a missing default, which would make this microservice only act upon the specified entities and by default ignore all non-matching ones. This would behave like having
"ignore": True
under"default"
, but without having to add"ignore": False
to the specified idp configs (you need this, as"ignore": True
would propagate from the default profile)