Source code for bluesky_httpserver.authorization.api_access

import asyncio
import copy
import logging
import random
import time as ttime
from collections.abc import Iterable

import httpx
import jsonschema
import yaml

from ..config_schemas.loading import ConfigError
from ._defaults import _DEFAULT_ROLES, _DEFAULT_USER_INFO

logger = logging.getLogger(__name__)


_schema_BasicAPIAccessControl = """
$schema": http://json-schema.org/draft-07/schema#
type: object
additionalProperties: false
properties:
  roles:
    oneOf:
      - type: object
        additionalProperties: false
        patternProperties:
          "^[a-zA-Z_][0-9a-zA-Z_.@\\\\-\\\\+]*$":
            oneOf:
              - type: object
                additionalProperties: false
                properties:
                  scopes_set:
                    $ref: '#/components/schemas/scope_list'
                  scopes_add:
                    $ref: '#/components/schemas/scope_list'
                  scopes_remove:
                    $ref: '#/components/schemas/scope_list'
              - type: "null"
      - type: "null"
components:
  schemas:
    scope_list:
      oneOf:
        - type: array
          items:
            type: string
            pattern: "^[0-9a-zA-Z:_]+$"
        - type: string
          pattern: "^[0-9a-zA-Z:_]+$"
        - type: "null"
"""


[docs]class BasicAPIAccessControl: """ Basic API access policy. The policy is used by HTTP server by default. The basic policy supports two default users: ``UNAUTHENTICATED_SINGLE_USER`` (user authorized with single-user API key) and ``UNAUTHENTICATED_PUBLIC`` (unauthorized user, sending no token or API key with the request). The default users are assigned to ``unauthenticated_single_user`` and ``unauthenticated_public`` roles respectively. In additon, five roles are defined by the policy and available to subclassed policies: ``admin``, ``expert``, ``advanced``, ``user`` and ``observer``. Each of the seven roles is assigned a reasonable set of default scopes, which may be customized for practical deployments. The parameter ``roles`` allows to replace or modify sets of scopes assigned to default roles or add new custom roles recognized by policies. Note, that the basic API access policy support only unauthenticated access and uses only two roles. The other default roles and custom roles are intended for use in subclasses that define more sophysticated policies. The optional parameter ``roles`` accepts a dictionary, which maps role names to operations on the respective sets of scopes. The operations include ``scopes_set`` (replaces the existing scopes by the new scopes or sets scopes for a new custom role), ``scopes_add`` (adds scopes to the existing scopes) and ``scopes_remove`` (remove scopes from the set). Multiple operations for a given role are executed in the following order: ``scopes_set``, ``scopes_add`` followed by ``scopes_remove``. The following examples illustrate how modify API access for the default ``user`` role. Access to API may be disabled by mapping the role name to ``None`` or setting scopes to the empty list:: # 'user' can not access any API. {"user": None} {"user": {"scopes_set": []}} {"user": {"scopes_set": None}} In the following examples, the scopes are not changed, since no operations are specified or the operations do not modify the scopes:: # Scopes are not changed {"user": {}} {"user": {"scopes_add": []}} {"user": {"scopes_add": None}} {"user": {"scopes_remove": []}} {"user": {"scopes_remove": None}} {"user": {"scopes_add": [], "scopes_remove": []}} {"user": {"scopes_add": None, "scopes_remove": None}} The scopes are replaces by specifying a list of scopes with ``scopes_set``:: # Replace the scopes with the new set: 'user' can now only read status and the queue. {"user": {"scopes_set": ["read:status", "read:queue"]}} # Replace the scopes: 'user' can now only read status. {"user": {"scopes_set": ["read:status"]}} # A single scope may be specified as a string (more convenient in YML config file). {"user": {"scopes_set": "read:status"}} Additional scopes can be added to the default scopes:: # In addition to default scopes, 'user' can now upload and execute scripts. {"user": {"scopes_add": ["write:scripts"]}} {"user": {"scopes_add": "write:scripts"}} Scopes can be removed from the default scopes:: # 'user' is assigned the default scopes except API for editing the queue. {"user": {"scopes_remove": ["write:queue:edit"]}} {"user": {"scopes_remove": "write:queue:edit"}} Multiple operations may be specified:: # Now the 'user' can execute scripts, but not edit the queue. {"user": {"scopes_add": ["write:scripts"], "remove": ["write:queue:edit"]}} {"user": {"scopes_add": "write:scripts", "remove": "write:queue:edit"}} In practical deployments, the policy arguments are defined in config YML files. The following is an example of configuration that modifies the scopes for the ``user`` role and creates a new ``test_role``:: api_access: policy: bluesky_httpserver.authorization:BasicAPIAccessControl args: roles: user: scopes_add: write:scripts scopes_remove: - write:queue:edit - read:queue:edit test_role: scopes_add: - read:status - read:queue - read:history - read:resources - read:config - read:monitor - read:console - read:lock - read:testing Parameters ---------- roles: dict, None The dictionary that maps role names to operations that modify assigned role scopes. If ``None``, then the default roles with default unmodified scopes are used. """
[docs] def __init__(self, *, roles=None): try: config = {"roles": roles} schema = yaml.safe_load(_schema_BasicAPIAccessControl) jsonschema.validate(instance=config, schema=schema) except jsonschema.ValidationError as err: msg = err.args[0] raise ConfigError(f"ValidationError while validating parameters BasicAPIAccessControl: {msg}") from err roles = roles or {} self._roles = copy.deepcopy(_DEFAULT_ROLES) for role, params in roles.items(): role_scopes = self._roles.setdefault(role, set()) # If 'params' is None, then the role has no access (scopes is an empty set) if params is None: params = {"scopes_set": []} if "scopes_set" in params: role_scopes.clear() role_scopes.update(self._create_scope_list(params["scopes_set"])) if "scopes_add" in params: role_scopes.update(self._create_scope_list(params["scopes_add"])) if "scopes_remove" in params: scopes_list = self._create_scope_list(params["scopes_remove"]) for scope in scopes_list: role_scopes.discard(scope) self._user_info = copy.deepcopy(_DEFAULT_USER_INFO)
def _create_scope_list(self, scopes): if isinstance(scopes, str): return [scopes.lower()] elif isinstance(scopes, Iterable): return [_.lower() for _ in scopes] elif not scopes: return [] else: raise TypeError(f"Unsupported type of scope list: scopes = {scopes!r}") def _is_user_known(self, username): return username in self._user_info def _collect_scopes(self, role): """ Returns an empty set if the role is not defined. """ return self._roles.get(role, set()) def _collect_user_info(self, username): """ Returns an empty dictionary if user data is found. """ return self._user_info.get(username, {}) def _collect_role_scopes(self, roles): """ 'roles' is a role name (string) or a list of roles (list of strings). Returns a set of scopes. """ if isinstance(roles, str): scopes = self._collect_scopes(roles) else: scopes = set().union(*[self._collect_scopes(_) for _ in roles]) return scopes def is_user_known(self, username): """ Performs quick check whether the user is known. In many cases it does not make sense to perform any further authorization steps if the user is unknown. If the user is known, but not assigned to any groups or assigned to groups with empty scopes, then the user still can not access any API. Parameters ---------- username: str User name Returns ------- boolean Indicates if the user is known (``True``) or not (``False``). """ return self._is_user_known(username) def get_user_roles(self, username): """ Returns a set of roles assigned to the user. Parameters ---------- username: str User name Returns ------- set(str) A set of roles assigned to the user. The set of roles is empty if the user is not found. """ principal_info = self._collect_user_info(username) roles = principal_info.get("roles", []) if isinstance(roles, str): roles = [roles] return set(roles) def get_user_scopes(self, username): """ Returns a set of scopes assigned to the user. The scopes are based on the user roles. Parameters ---------- username: str User name Returns ------- set(str) A set of scopes assigned to the user. The set of scopes is empty if the user is not found. """ roles = self.get_user_roles(username) return self._collect_role_scopes(roles) def get_displayed_user_name(self, username): """ Returns the displayed user name for the user. The displayed user name is assembled from ``username``, full 'displayed' user name and user's email. The formatting depends on the available data, i.e. if no additional data is available, then ``username`` is returned. If the user is not found, then ``username`` is returned. The following output is possible for the user *'jdoe'*:: jdoe jdoe <jdoe@gmail.com> jdoe "John Doe" jdoe "John Doe <jdoe@gmail.com>" Parameters ---------- username: str User name Returns ------- str Formatted displayed user name. """ user_info = self._collect_user_info(username) email = user_info.get("email", None) displayed_name = user_info.get("displayed_name", None) if not email and not displayed_name: return username elif not displayed_name: return f"{username} <{email}>" elif not email: return f'{username} "{displayed_name}"' else: return f'{username} "{displayed_name} <{email}>"' def get_user_info(self, username): """ Returns complete user information, including a set of roles, set of scopes and displayed user name. This operation is more efficient that getting those items one by one. Parameters ---------- username: str User name Returns ------- dict The dictionary with full user information. The keys: ``roles`` (see ``get_user_roles()``), ``scopes`` (see ``get_user_scopes()``) and ``displayed_name`` (see ``get_displayed_user_name()``). """ roles = self.get_user_roles(username) scopes = self._collect_role_scopes(roles) displayed_name = self.get_displayed_user_name(username) return {"roles": roles, "scopes": scopes, "displayed_name": displayed_name}
_schema_DictionaryAPIAccessControl = """ $schema": http://json-schema.org/draft-07/schema# type: object additionalProperties: false properties: roles: # Detailed validation is performed elsewhere description: The value is passed to BasicAPIAccessControl object users: oneOf: - type: object additionalProperties: false patternProperties: "^[0-9a-zA-Z_.@\\\\-\\\\+]+$": oneOf: - type: object additionalProperties: false properties: roles: oneOf: - type: array items: type: string pattern: "^[a-zA-Z_][0-9a-zA-Z_]*$" - type: string pattern: "^[a-zA-Z_][0-9a-zA-Z_]*$" - type: "null" displayed_name: oneOf: - type: string pattern: "^.+$" - type: "null" email: oneOf: - type: string pattern: "^.+@.+$" - type: "null" - type: "null" - type: "null" """
[docs]class DictionaryAPIAccessControl(BasicAPIAccessControl): """ Dictionary-based API access policy. Simple extension of ``BasicAPIAccessControl`` that provides an option to provide user information, including assigned roles, displayed name and email. The policy is primarily intended for use in demos and testing. Production deployments are expected to use more secure authorization policies. User information is passed using ``users`` parameter, which accepts a dictionary. If the parameter is ``None``, then no user information is passed to the policy and no users are allowed to access any API. The dictionary maps usernames to user information dictionaries, containing roles, displayed names (optional) and emails (optional). The policy arguments are specified as part of config YML files as illustrated in the following examples:: # No users are allowed to access any API. api_access: policy: bluesky_httpserver.authorization:DictionaryAPIAccessControl args: users: None # User 'bob' is defined, but he is not allowed to use any API. api_access: policy: bluesky_httpserver.authorization:DictionaryAPIAccessControl args: users: bob: None # User 'bob' is assigned to 'admin' and 'expert' groups, 'jdoe' is assigned to the 'advanced' group. # Note: a single role may be represented as a list or a string. api_access: policy: bluesky_httpserver.authorization:DictionaryAPIAccessControl args: users: bob: roles: - admin - expert email: bob@gmail.com jdoe: roles: advanced dislayed_name: Doe, John email: jdoe@gmail.com The policy arguments may also include ``roles`` parameter, which is handled by ``BasicAPIAccessControl``. See docstring for ``BasicAPIAccessControl`` for more detailed information. Parameters ---------- roles: dict or None The dictionary configuration parameters that modifies the default or create new roles. The parameter is passed to ``BasicAPIAccessControl``. users: dict or None The dictionary that maps user name to user information. """
[docs] def __init__(self, *, roles=None, users=None): super().__init__(roles=roles) try: config = {"roles": roles, "users": users} schema = yaml.safe_load(_schema_DictionaryAPIAccessControl) jsonschema.validate(instance=config, schema=schema) except jsonschema.ValidationError as err: msg = err.args[0] raise ConfigError(f"ValidationError while validating parameters BasicAPIAccessControl: {msg}") from err users = users or {} user_info = copy.deepcopy(users) for k in user_info: if user_info[k] is None: user_info[k] = {} else: user_info[k] = dict(user_info[k]) for v in user_info.values(): v.setdefault("roles", []) if v["roles"] is None: v["roles"] = [] if isinstance(v["roles"], str): v["roles"] = v["roles"].lower() else: v["roles"] = [_.lower() for _ in v["roles"]] self._user_info.update(user_info)
_schema_ServerBasedAPIAccessControl = """ $schema": http://json-schema.org/draft-07/schema# type: object additionalProperties: false properties: instrument: type: string roles: # Detailed validation is performed elsewhere description: The value is passed to BasicAPIAccessControl object server: type: string port: type: integer update_period: type: integer expiration_period: type: [integer, "null"] http_timeout: type: integer """
[docs]class ServerBasedAPIAccessControl(BasicAPIAccessControl): """ Access policy based on external Access Control Server. The user access data is periodically requested from the server using REST API. The access control server is expected to expose ``/instrument/{instrument}/qserver/access`` API, where ``instrument`` is the lowercase name of the instrument passed to the class constructor. The API is expected to return a dictionary which maps roles ('admin', 'expert', 'advanced', 'user', 'observer') to dictionaries with information on users that are assigned the role, for example .. code-block:: { "admin": { "bob": {"email": "bob@gmail.com"}, "tom": {}, }, "expert": { "bob": {"email": "bob@gmail.com"} }, "advanced": { "jdoe": {"email": "jdoe@gmail.com", "first_name": "John", "last_name": "Doe"} }, "user": {}, "observer": {}, } User information consists of the username (dictionary key, which makes it mandatory) and optional ``'email'`` and ``'displayed_name'``. Additional user information is ignored. Access information is requested from the server at startup and periodically updated during operation with the period ``update_period +/-20%``. If the server is not accessible, the user access rights do not change until access information expires. The expiration period is set using the parameter ``expiration_period``. If the access information expires and an attempt to update it fails, all users lose access to the HTTP server. The scopes for the roles can be modified by passing the parameter dictionary with the parameter ``roles``. The dictionary is handled by the constructor of ``BasicAPIAccessControl``. See the class documentation for more details. Parameters ---------- instrument: str Instrument ID, such as 'SRX' or 'TES'. This is the required parameter. roles: dict or None, optional The dictionary that defines new and/or modifies existing roles. The dictionary is passed to the ``BasicAPIAccessControl`` constructor. Default: ``None``. server: str, optional Access Control server address, such as ``'accesscontrol.server.com'`` or ``'110.43.6.45'``. The default address is ``localhost``. port: int, optional Access Control server port. The default port is `8000`. update_period: int, optional Average period in seconds between consecutive requests for updated access data. The actual period is randomized (uniform distribution in the range +/-20% of the update period). Default: 600. expiration_period: int or None, optional Expiration period for the current access data. If a request to the API server fails and the data is expired, then users lose access. Longer expiration period allows users to continue operation if the API server is temporarily unavailable. If the value is ``None``, then the period is set to ``1.5 * update_period``. Default: ``None``. http_timeout: int, optional Timeout for requests to the API server. """
[docs] def __init__( self, *, instrument=None, roles=None, server="localhost", port=8000, update_period=600, expiration_period=None, http_timeout=5, ): super().__init__(roles=roles) if instrument is None: raise ConfigError("The required parameter 'instrument' is not specified") try: config = { "instrument": instrument, "roles": roles, "server": server, "port": port, "update_period": update_period, "expiration_period": expiration_period, "http_timeout": http_timeout, } schema = yaml.safe_load(_schema_ServerBasedAPIAccessControl) jsonschema.validate(instance=config, schema=schema) except jsonschema.ValidationError as err: msg = err.args[0] raise ConfigError(f"ValidationError while validating parameters BasicAPIAccessControl: {msg}") from err self._instrument = instrument.lower() self._server = server self._port = port self._update_period = update_period self._http_timeout = http_timeout self._expiration_period = expiration_period or (update_period * 1.5) current_time = ttime.time() self._time_next_update = current_time self._time_expiration = current_time self.background_tasks = [self._background_updates]
async def update_access_info(self): """ Send a single request to the API server and update locally stored access control info. """ base_url = f"http://{self._server}:{self._port}" access_api = f"/instrument/{self._instrument.lower()}/qserver/access" async with httpx.AsyncClient(base_url=base_url, timeout=self._http_timeout) as client: response = await client.get(access_api) response.raise_for_status() groups = response.json() user_info = {} for g, gmembers in groups.items(): if g in self._roles: for u, ui in gmembers.items(): user_info.setdefault(u, {}) user_info[u].setdefault("roles", []).append(g) if ("first_name" in ui) or ("last_name" in ui): first_name = ui.get("first_name", "") or "" first_name = first_name if isinstance(first_name, str) else "" last_name = ui.get("last_name", "") or "" last_name = last_name if isinstance(last_name, str) else "" first_name, last_name = first_name.strip(), last_name.strip() if first_name and last_name: last_name = " " + last_name if first_name or last_name: displayed_name = first_name + last_name user_info[u].setdefault("displayed_name", displayed_name) if ("email" in ui) and ui["email"] and isinstance(ui["email"], str): user_info[u].setdefault("email", ui["email"]) else: logger.error("Unsupported role %r. Supported roles: %s", g, list(self._roles.keys())) self._clear_user_info() self._user_info.update(user_info) self._time_expiration = ttime.time() + self._expiration_period async def _background_updates(self): """ Start this task during the server startup. The task periodically sends requests to API server and updates locally stored access control data. """ while True: try: await self.update_access_info() except Exception as ex: logger.error(f"Failed to update access control data: {ex}.") if ttime.time() > self._time_expiration: logger.error("Access control data expired.") self._clear_user_info() # Wait for the next update. Randomize waiting time. t_next = self._update_period t_next_variation = t_next * 0.2 t_next = t_next + random.uniform(-t_next_variation, t_next_variation) await asyncio.sleep(t_next) def _clear_user_info(self): """ Clear non-default entries from user info dict """ for k in list(self._user_info.keys()): if k not in _DEFAULT_USER_INFO: self._user_info.pop(k)