Skip to content
This repository was archived by the owner on Jan 10, 2019. It is now read-only.

First commit of the IdP Metadata Attribute Store #8

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
module: idp_metadata_attribute_store.IdpMetadataAttributeStore
name: IdpMetadataAttributeStore
config:
default:
display_name:
# SATOSA internal attribute name to use
internal_attribute_name: idpdisplayname
# Language preference. 'en' or English is the default
# if not specified.
lang: en
entity_id:
internal_attribute_name: idpentityid
organization_name:
internal_attribute_name: idporgname
organization_display_name:
internal_attribute_name: idporgdisplayname

# Configuration may also be done per-IdP with any
# missing parameters taken from the default if any.
# The configuration key is the entityID of the IdP.
#
# For example:
https://idp.myorg.edu/idp/shibboleth:
display_name:
internal_attribute_name: othername
lang: jp
# The microservice may be configured to ignore a particular IdP.
https://login.other.org.edu/idp/shibboleth:
ignore: true


239 changes: 239 additions & 0 deletions src/satosa/micro_services/idp_metadata_attribute_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
"""
SATOSA microservice that includes in the assertion
attributes taken from SAML metadata about the SAML
IdP used for authentication.

The attributes that may be asserted from the SAML
metadata for the IdP include

<mdui:DisplayName>
<OrganiationName>
<OrganizationDisplayName>

A typical configuration would be

module: idp_metadata_attribute_store.IdpMetadataAttributeStore
name: IdpMetadataAttributeStore
config:
default:
display_name:
# SATOSA internal attribute name to use
internal_attribute_name: idpdisplayname
# Language preference with 'en' or English as default
lang: en
entity_id:
internal_attribute_name: idpentityid
organization_name:
internal_attribute_name: idporgname
lang: en
organization_display_name:
internal_attribute_name: idporgdisplayname
lang: en

# Configuration may also be done per-IdP with any
# missing parameters taken from the default if any.
# The configuration key is the entityID of the IdP.
#
# For example:
https://login.myorg.edu/idp/shibboleth:
display_name:
internal_attribute_name: othername
lang: jp
# The microservice may be configured to ignore a particular IdP.
https://login.other.org.edu/idp/shibboleth:
ignore: true
"""

import satosa.micro_services.base
from satosa.logging_util import satosa_logging
from satosa.exception import SATOSAError
from satosa.context import Context

import copy
import logging

logger = logging.getLogger(__name__)

class IdpMetadataAttributeStoreError(SATOSAError):
"""
LDAP attribute store error
"""
pass

class IdpMetadataAttributeStore(satosa.micro_services.base.ResponseMicroService):
"""
Use the metadata store attached to the proxy SP in the context
to lookup metadata about the IdP entity making the assertion
and include metadata details as attributes in the assertion sent
to the SP that made the request.
"""

config_defaults = { 'ignore' : False }

def __init__(self, config, *args, **kwargs):
super().__init__(*args, **kwargs)

if 'default' in config and "" in config:
msg = """Use either 'default' or "" in config but not both"""
satosa_logging(logger, logging.ERROR, msg, None)
raise IdpMetadataAttributeStoreError(msg)

if "" in config:
config['default'] = config.pop("")

if 'default' not in config:
msg = "No default configuration is present"
satosa_logging(logger, logging.ERROR, msg, None)
raise IdpMetadataAttributeStoreError(msg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this should be an error. I would allow a missing default, which would make this microservice only act upon the specified entities and by default ignore all non-matching ones. This would behave like having "ignore": True under "default", but without having to add "ignore": False to the specified idp configs (you need this, as "ignore": True would propagate from the default profile)


self.config = {}

# Process the default configuration first then any per-IdP overrides.
idp_list = ['default']
idp_list.extend([ key for key in config.keys() if key != 'default' ])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does 'default' have to be first? The following seems to be what you need.

idp_list = config.keys()
id_default = idp_list.pop('default', {})
self.config_defaults.update(idp_default)


for idp in idp_list:
if not isinstance(config[idp], dict):
msg = "Configuration value for {} must be a dictionary"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This format string is missing a parameter. I suppose you want:

msg = "Configuration for {} must be a dictionary".format(idp)

satosa_logging(logger, logging.ERROR, msg, None)
raise IdpMetadataAttributeStoreError(msg)

# Initialize configuration using module defaults then update
# with configuration defaults and then per-IdP overrides.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can precompute this, as config_defaults and config['default'] will not change - that's what I've done above with idp_list.

idp_config = copy.deepcopy(IdpMetadataAttributeStore.config_defaults)
if 'default' in self.config:
idp_config.update(self.config['default'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This

if 'default' in self.config:

should always by True (if you take into account the No default configuration is present error).

But, if you follow my comments, you can replace these lines with:

idp_config = copy.deepcopy(self.config_defaults)
idp_config.update(config[idp])

config_defaults should be prepared with config['default'] and you can use that without checks.

idp_config.update(config[idp])

self.config[idp] = idp_config

satosa_logging(logger, logging.INFO, "IdP Metadata Attribute Store microservice initialized", None)

def _first_lang_element_text(self, elements, lang='en'):
"""
Loop over the list representing XML elements that contain text and find
the first text value for the input lang where 'en' or English is the
default lang.

Each item in the list is a dictionary with keys

__class__
lang
text

as expected from the metadata returned for an entity by the MetadataStore
class from pysaml2.

If no element has the input lang then return the text from the first
element.

If no element has text then return an empty string.
"""
for e in elements:
if lang in e:
if 'text' in e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no reason for this to be nested, I would write it as

if lang in e and 'text' in e:

or (maybe)

if all(k in e.keys() for k in [lang, 'text']):

return e['text']

for e in elements:
if 'text' in e:
return e['text']

return ''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, now I see what're trying to do here, according to the comment. I guess that's why the previous loop was nested.

first_text_lang = None
first_text = None
fallback = ''

for el in elements:
  if 'text' in el.keys():
    if first_text is None:
      first_text = el['text']
    if first_text_lang is None and lang in el.keys(): # should this be: lang is el.get('lang', None) ?
      first_text_lang = el['text']
      break

return first_text_lang or first_text or fallback

PS: Note my comment there. lang represents the value, not the key-name. I guess we should be checking for that: lang is el.get('lang', None)


def process(self, context, data):
"""
Default interface for microservices. Process the input data for
the input context.
"""
self.context = context

# Find the entityID for the IdP that issued the assertion.
try:
idp_entity_id = data.to_dict()['auth_info']['issuer']
except KeyError as err:
satosa_logging(logger, logging.ERROR, "Unable to determine the entityID for the IdP issuer", context.state)
return super().process(context, data)

# Get the configuration for the IdP.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the comments above (allowing not having a default configuration), I would do the following:

config = self.config.get(idp_entity_id) or self.config.get('default', {})
satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state)

if idp_entity_id in self.config.keys():
config = self.config[idp_entity_id]
else:
config = self.config['default']

satosa_logging(logger, logging.DEBUG, "Using config {}".format(config), context.state)

# Log the entityID of the authenticating IdP.
satosa_logging(logger, logging.INFO, "entityID for authenticating IdP is {}".format(idp_entity_id), context.state)

# Ignore this IdP if so configured.
if config['ignore']:
satosa_logging(logger, logging.INFO, "Ignoring IdP {}".format(idp_entity_id), context.state)
return super().process(context, data)

# Set the entityID attribute if so configured.
if 'entity_id' in config:
data.attributes[config['entity_id']['internal_attribute_name']] = idp_entity_id

# Get the metadata store the SP for the proxy is using. This
# will be an instance of the class MetadataStore from mdstore.py
# in pysaml2.
metadata_store = context.get_decoration(Context.KEY_BACKEND_METADATA_STORE)

# Get the metadata for the IdP.
try:
metadata = metadata_store[idp_entity_id]
except Exception as err:
satosa_logging(logger, logging.ERROR, "Unable to retrieve metadata for IdP {}".format(idp_entity_id), context.state)
return super().process(context, data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a recurring pattern:

try:
  something()
except:
  return super().process(context, data)

This can be handled better by changing process() to call _process() as such:

class IdpMetadataAttributeStoreWarning(SATOSAError):
  pass

def process(..):
  try:
    self._process(..)
  except IdpMetadataAttributeStoreWarning as ex:
    satosa_logging(logger, logging.WARN, str(ex), context.state)
    return super().process(context, data)

def _process(..):
  # all the code previously on process() but with modified parts as such:
  try:
    something()
  except SomeError as err:
    msg_warn = "something went wrong, but it's ok"
    raise IdpMetadataAttributeStoreWarning(msg_warn) from err

Errors of type IdpMetadataAttributeStoreWarning will be logged and let the process continue, but other errors will be raised and stop the execution. This also makes errors that we handle explicit.


satosa_logging(logger, logging.DEBUG, "Metadata for IdP {} is {}".format(idp_entity_id, metadata), context.state)

# Find the mdui:DisplayName for the IdP if so configured.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would split the processing of the attributes into different classes. This may be a bit of over-engineering but it allows to extend the mechanism by only creating a new class with just the appropriate find handler and mapping it to the factory. It also makes code reusable.

from abc import ABC

# abstract class acts like an interface
class GenericAttribute(ABC): # not sure about the name
  @abstractmethod
  def find(self, metadata, lang, *argv):
    raise NotImplementedError

# organization_display_name and organization_name use the same mechanism for find()
class GenericName(GenericAttribute, ABC):
  def find(self, metadata, lang, *argv):
    org_display_name_elements = metadata['organization'][self.attr_name]
    value = self._first_lang_element_text(org_display_name_elements, lang)
    return value

class OrganizationDisplayName(GenericName):
  attr_name = 'organization_display_name'

class OrganizationName(GenericName):
  attr_name = 'organization_name'

class DisplayName(GenericAttribute):
  attr_name = 'display_name'

  def find(self, metadata, lang, *argv):
    extensions = metadata['idpsso_descriptor'][0]['extensions']['extension_elements']
    for e in extensions:
      if e['__class__'] == 'urn:oasis:names:tc:SAML:metadata:ui&UIInfo':
        display_name_elements = e[self.attr_name]
        value = self._first_lang_element_text(display_name_elements, lang)
        break
    return value


class AttributeFactory:
  factory = {
    'organization_display_name': OrganizationDisplayName,
    'organization_name': OrganizationName,
    'display_name': DisplayName,
  }

  def create(attr_type):
    try:
      attr_class = factory[attr_type]
    except KeyError as err:
      msg_err = "No handler for attribute: {}".format(attr_type)
      raise IdpMetadataAttributeStoreError(msg_err) from err
    else:
      return attr_class()

def process(..):
  ...
  for attr in config.keys():
    attribute_handler = AttributeFactory.create(attr)
    value = attribute_handler.find(metadata, lang)
    if value is not None:
      data.attributes[config[attr]['internal_attribute_name']] = value
      msg_dbg = "attribute {} is {}".format(attr, value)
      satosa_logging(logger, logging.DEBUG, msg_dbg, context.state)

if 'display_name' in config:
lang = config['display_name'].get('lang', 'en')
try:
# We assume there is only one IDPSSODescriptor in the IdP metadata.
extensions = metadata['idpsso_descriptor'][0]['extensions']['extension_elements']
for e in extensions:
if e['__class__'] == 'urn:oasis:names:tc:SAML:metadata:ui&UIInfo':
display_name_elements = e['display_name']
display_name = self._first_lang_element_text(display_name_elements, lang)
break

if display_name:
satosa_logging(logger, logging.DEBUG, "display_name is {}".format(display_name), context.state)
data.attributes[config['display_name']['internal_attribute_name']] = display_name

except Exception as err:
satosa_logging(logger, logging.WARN, "Unable to determine display name for {}".format(idp_entity_id), context.state)

# Find the OrganizationDisplayName for the IdP if so configured.
if 'organization_display_name' in config:
lang = config['organization_display_name'].get('lang', 'en')
try:
org_display_name_elements = metadata['organization']['organization_display_name']
organization_display_name = self._first_lang_element_text(org_display_name_elements, lang)

if organization_display_name:
satosa_logging(logger, logging.DEBUG, "organization_display_name is {}".format(organization_display_name), context.state)
data.attributes[config['organization_display_name']['internal_attribute_name']] = organization_display_name

except Exception as err:
satosa_logging(logger, logging.WARN, "Unable to determine organization display name for {}".format(idp_entity_id), context.state)

# Find the OrganizationName for the IdP if so configured.
if 'organization_name' in config:
lang = config['organization_name'].get('lang', 'en')
try:
org_name_elements = metadata['organization']['organization_name']
organization_name = self._first_lang_element_text(org_name_elements, lang)

if organization_name:
satosa_logging(logger, logging.DEBUG, "organization_name is {}".format(organization_name), context.state)
data.attributes[config['organization_name']['internal_attribute_name']] = organization_name

except Exception as err:
satosa_logging(logger, logging.WARN, "Unable to determine organization display name for {}".format(idp_entity_id), context.state)

satosa_logging(logger, logging.DEBUG, "Returning data.attributes {}".format(str(data.attributes)), context.state)
return super().process(context, data)