import ast
import collections
import copy
import enum
import glob
import importlib
import importlib.resources
import inspect
import logging
import numbers
import os
import random
import re
import shutil
import sys
import tempfile
import traceback
import typing
from collections.abc import Iterable, Mapping
import jsonschema
import pydantic
import yaml
from numpydoc.docscrape import NumpyDocString
from packaging import version
import bluesky_queueserver
from .logging_setup import PPrintForLogging as ppfl
logger = logging.getLogger(__name__)
qserver_version = bluesky_queueserver.__version__
pydantic_version_major = version.parse(pydantic.__version__).major
class ScriptLoadingError(Exception):
    def __init__(self, msg, tb):
        super().__init__(msg)
        self._tb = tb
    @property
    def tb(self):
        """
        Returns the full traceback (string) that is passed as a second parameter.
        The traceback is expected to be fully prepared so that it could be saved
        or passed to consumer over the network. Capturing traceback in the function
        executing the script is reducing references to Queue Server code, which
        is generic and unimportant for debugging startup scripts.
        """
        return self._tb
class _RegisteredNamespaceItems:
    def __init__(self):
        self._reg_devices = {}
        self._reg_plans = {}
    def clear(self):
        """
        Clear the dictionaries of registered plans and devices.
        """
        self._reg_devices.clear()
        self._reg_plans.clear()
    @property
    def reg_devices(self):
        """
        Property returns the reference to the dictionary of registered devices.
        """
        return self._reg_devices
    @property
    def reg_plans(self):
        """
        Property returns the reference to the dictionary of registered plans.
        """
        return self._reg_plans
    def _validate_name(self, name, *, item_type):
        """
        Validate object name (for a plan or device). Item type should be a string ('device' or 'plan'),
        used in error messages.
        """
        item_type = item_type.capitalize()
        if not isinstance(name, str):
            raise TypeError(f"{item_type} name must be a string: name={name} type(name)={type(name)}")
        if not name:
            raise ValueError(f"{item_type} name is an empty string")
        if not re.search(r"^[_a-zA-Z]([_a-zA-Z0-9])*$", name):
            raise ValueError(
                f"{item_type} name {name!r} contains invalid characters. The name must start with a letter "
                "or underscore and may contain letters, numbers and underscores.)"
            )
    def _validate_depth(self, depth):
        """
        Validate the value of 'depth' parameter.
        """
        if not isinstance(depth, int):
            raise TypeError(f"Depth must be an integer number: depth={depth} type(depth)={type(depth)}")
        if depth < 0:
            raise ValueError(f"Depth must be a positive integer: depth={depth}")
    def register_device(self, name, *, depth=1, exclude=False):
        """
        Register a device with Queue Server. The function allows to specify depth of the tree
        of subdevices included in the list or exclude the device from the list. Only top-level
        devices can be registered. Devices can be registered before they are defined in
        the startup code.
        This is experimental API. Functionality may change in the future.
        Parameters
        ----------
        name: str
            Name of the device. This should be a name of the variable in the global scope
            of the startup code. Only top level devices can be registered. Names of the device
            components are not accepted.
        depth: int
            Depth of the device tree included in the list of existing devices: 0 - include
            the tree of unlimited depth, 1 - include only top-level device, 2 - include
            the top-level device and it's components etc. Depth specified for registered
            devices overrides the depth used by default for unregistered devices.
        exclude: boolean
            Set this parameter ``False`` to exclude the device from the list of existing
            devices. The device can still be used in the code, but it can not be passed
            as a parameter value to a plan via RE Manager.
        Raises
        ------
        ValueError, TypeError
            Unsupported type or value of passed parameters.
        """
        self._validate_name(name, item_type="device")
        self._validate_depth(depth)
        exclude = bool(exclude)
        self._reg_devices[name] = dict(obj=None, depth=depth, exclude=exclude)
    def register_plan(self, name, *, exclude=False):
        """
        Register a plan with Queue Server. The function allows to exclude a plan from
        processing by the Queue Server.
        This is experimental API. Functionality may change in the future.
        Parameters
        ----------
        name: str
            Name of the plan. This should be a name of the variable in the global scope
            of the startup code.
        exclude: boolean
            Set this parameter ``False`` to exclude the plan from the list of existing
            plans. The plan can still be called from other plans, but it can not be
            started using RE Manager.
        Raises
        ------
        ValueError, TypeError
            Unsupported type or value of passed parameters.
        """
        self._validate_name(name, item_type="plan")
        exclude = bool(exclude)
        self._reg_plans[name] = dict(obj=None, exclude=exclude)
reg_ns_items = _RegisteredNamespaceItems()
clear_registered_items = reg_ns_items.clear
# The following are public functions intended to be used in startup code
register_device = reg_ns_items.register_device
register_plan = reg_ns_items.register_plan
def get_default_startup_dir():
    """
    Returns the path to the default profile collection that is distributed with the package.
    The function does not guarantee that the directory exists. Used for demo with Python-based worker.
    """
    pc_path = os.path.join(importlib.resources.files("bluesky_queueserver"), "profile_collection_sim", "")
    return pc_path
def get_default_startup_profile():
    """
    Returns the name for the default startup profile. Used for demo with IPython-based worker.
    The startup code is expected to be in ``/tmp/qserver/ipython/profile_collection_sim/startup`` directory.
    """
    return "collection_sim"
def create_demo_ipython_profile(startup_dir, *, delete_existing=True):
    """
    Create demo IPython profile in temporary location. Copy startup directory
    to the new profile. Raises IOError if the destination directory is not temporary.
    """
    tempdir = tempfile.gettempdir()
    if os.path.commonprefix([tempdir, startup_dir]) != tempdir:
        raise IOError("Attempting to create a demo profile in non-temporary startup directory: %r", startup_dir)
    # Delete the existing startup directory (but not the whole profile).
    if delete_existing:
        shutil.rmtree(startup_dir, ignore_errors=True)
    # Copy the startup files
    default_startup_dir = get_default_startup_dir()
    shutil.copytree(default_startup_dir, startup_dir)
_startup_script_patch = """
import sys
import logging
from bluesky_queueserver.manager.profile_tools import global_user_namespace
logger_patch = logging.Logger(__name__)
global_user_namespace.set_user_namespace(user_ns=locals(), use_ipython=False)
class IPDummy:
    def __init__(self, user_ns):
        self.user_ns = user_ns
        # May be this should be some meaningful logger (used by 'configure_bluesky_logging')
        self.log = logging.Logger('ipython_patch')
def get_ipython_patch():
    ip_dummy = IPDummy(global_user_namespace.user_ns)
    return ip_dummy
get_ipython = get_ipython_patch
"""
def _patch_script_code(code_str):
    """
    Patch script code represented as a single text string:
    detect lines that import ``get_ipython`` from ``IPython`` and
    set ``get_ipython=get_ipython_patch`` after each import.
    """
    def is_get_ipython_imported_in_line(line):
        """Check if ``get_ipython()`` is imported in the line"""
        # It is assumed that commenting is done using #
        return bool(re.search(r"^[^#]*IPython[^#]+get_ipython", line))
    s_list = code_str.splitlines()
    for n in range(len(s_list)):
        line = s_list[n]
        if is_get_ipython_imported_in_line(line):
            # The line may have comments. The patch should be inserted before the comment.
            line_split = re.split("#", line)
            # Do not patch empty lines that have only comments
            if line_split[0]:
                line = line_split[0] + "; get_ipython = get_ipython_patch"
                if len(line_split) > 1:
                    line += "  #" + "#".join(line_split[1:])
                s_list[n] = line
    return "\n".join(s_list)
def load_profile_collection(path, *, patch_profiles=True, nspace=None):
    """
    Load profile collection located at the specified path. The collection consists of
    .py files started with 'DD-', where D is a digit (e.g. 05-file.py). The files
    are alphabetically sorted before execution.
    Parameters
    ----------
    path: str
        path to profile collection
    patch_profiles: boolean
        enable/disable patching profiles. At this point there is no reason not to
        patch profiles.
    nspace: dict or None
        Reference to the existing namespace, in which the code is executed. If ``nspace``
        is *None*, then the new namespace is created.
    Returns
    -------
    nspace: dict
        namespace in which the profile collection was executed. Reference to ``nspace``
        passed as a parameter is returned if ``nspace`` is not *None*.
    Raises
    ------
    IOError
        path does not exist or the profile collection contains no valid startup files
    """
    # Create the list of files to load
    path = os.path.expanduser(path)
    path = os.path.abspath(path)
    if not os.path.exists(path):
        raise IOError(f"Path '{path}' does not exist.")
    if not os.path.isdir(path):
        raise IOError(f"Failed to load the profile collection. Path '{path}' is not a directory.")
    file_pattern_py = os.path.join(path, "*.py")
    file_pattern_ipy = os.path.join(path, "*.ipy")
    file_list = glob.glob(file_pattern_py) + glob.glob(file_pattern_ipy)
    file_list.sort()  # Sort in alphabetical order
    # If the profile collection contains no startup files, it is very likely
    #   that the profile collection directory is specified incorrectly.
    if not len(file_list):
        raise IOError(f"The directory '{path}' contains no startup files (mask '[0-9][0-9]*.py').")
    # Add original path to the profile collection to allow local imports
    #   from the patched temporary file.
    if path not in sys.path:
        # We don't want to add/remove the path if it is already in `sys.path` for some reason.
        sys.path.append(path)
        path_is_set = True
    else:
        path_is_set = False
    try:
        # Load the files into the namespace 'nspace'.
        nspace = {} if nspace is None else nspace
        if patch_profiles:
            exec(_startup_script_patch, nspace, nspace)
        for file in file_list:
            try:
                logger.info("Loading startup file '%s' ...", file)
                # Set '__file__' and '__name__' variables
                patch = f"__file__ = '{file}'; __name__ = '__main__'\n"
                exec(patch, nspace, nspace)
                if not os.path.isfile(file):
                    raise IOError(f"Startup file {file!r} was not found")
                code_str = _patch_script_code(open(file).read())
                code = compile(code_str, file, "exec")
                exec(code, nspace, nspace)
            except BaseException as ex:
                # Capture traceback and send it as a message
                msg = f"Error while executing script {file!r}: {ex}"
                ex_str = traceback.format_exception(*sys.exc_info())
                ex_str = "".join(ex_str) + "\n" + msg
                raise ScriptLoadingError(msg, ex_str) from ex
            finally:
                patch = "del __file__\n"  # Do not delete '__name__'
                exec(patch, nspace, nspace)
    finally:
        try:
            if path_is_set:
                sys.path.remove(path)
        except Exception:
            pass
    return nspace
def load_startup_module(module_name, *, nspace=None):
    """
    Populate namespace by import a module.
    Parameters
    ----------
    module_name: str
        name of the module to import
    nspace: dict or None
        Reference to the existing namespace, in which the code is executed. If ``nspace``
        is *None*, then the new namespace is created.
    Returns
    -------
    nspace: dict
        namespace that contains objects loaded from the module. Reference to ``nspace``
        passed as a parameter is returned if ``nspace`` is not *None*.
    """
    importlib.invalidate_caches()
    try:
        _module = importlib.import_module(module_name)
        nspace = {} if nspace is None else nspace
        nspace.update(_module.__dict__)
    except BaseException as ex:
        # Capture traceback and send it as a message
        msg = f"Error while loading module {module_name!r}: {ex}"
        ex_str = traceback.format_exception(*sys.exc_info())
        ex_str = "".join(ex_str) + "\n" + msg
        raise ScriptLoadingError(msg, ex_str) from ex
    return nspace
def load_startup_script(script_path, *, enable_local_imports=True, nspace=None):
    """
    Populate namespace by import a module.
    Parameters
    ----------
    script_path : str
        full path to the startup script
    enable_local_imports : boolean
        If ``False``, local imports from the script will not work. Setting to ``True``
        enables local imports.
    nspace: dict or None
        Reference to the existing namespace, in which the code is executed. If ``nspace``
        is *None*, then the new namespace is created.
    Returns
    -------
    nspace: dict
        namespace that contains objects loaded from the module. Reference to ``nspace``
        passed as a parameter is returned if ``nspace`` is not *None*.
    """
    importlib.invalidate_caches()
    if not os.path.isfile(script_path):
        raise ImportError(f"Failed to load the script '{script_path}': script was not found")
    nspace = {} if nspace is None else nspace
    if enable_local_imports:
        p = os.path.split(script_path)[0]
        sys.path.insert(0, p)  # Needed to make local imports work.
        # Save the list of available modules
        sm_keys = list(sys.modules.keys())
    try:
        if not os.path.isfile(script_path):
            raise IOError(f"Startup file {script_path!r} was not found")
        # Set '__file__' and '__name__' variables
        patch = f"__file__ = '{script_path}'; __name__ = '__main__'\n"
        exec(patch, nspace, nspace)
        code = compile(open(script_path).read(), script_path, "exec")
        exec(code, nspace, nspace)
    except BaseException as ex:
        # Capture traceback and send it as a message
        msg = f"Error while executing script {script_path!r}: {ex}"
        ex_str = traceback.format_exception(*sys.exc_info())
        ex_str = "".join(ex_str) + "\n" + msg
        raise ScriptLoadingError(msg, ex_str) from ex
    finally:
        patch = "del __file__\n"  # Do not delete '__name__'
        exec(patch, nspace, nspace)
        if enable_local_imports:
            # Delete data on all modules that were loaded by the script.
            # We don't need them anymore. Modules will be reloaded from disk if
            #   the script is executed again.
            for key in list(sys.modules.keys()):
                if key not in sm_keys:
                    # print(f"Deleting the key '{key}'")
                    del sys.modules[key]
            sys.path.remove(p)
    return nspace
def load_worker_startup_code(
    *,
    startup_dir=None,
    startup_module_name=None,
    startup_script_path=None,
    nspace=None,
):
    """
    Load worker startup code. Possible sources: startup directory (IPython-style profile collection),
    startup script or startup module.
    Parameters
    ----------
    startup_dir : str
        Name of the directory that contains startup files.
    startup_module_name : str
        Name of the startup module.
    startup_script_path : str
        Path to startup script.
    nspace: dict or None
        Reference to the existing namespace, in which the code is executed. If ``nspace``
        is *None*, then the new namespace is created.
    Returns
    -------
    nspace : dict
       Dictionary with loaded namespace data. Reference to ``nspace`` passed as a parameter
       is returned if ``nspace`` is not *None*.
    """
    if sum([_ is None for _ in [startup_dir, startup_module_name, startup_script_path]]) != 2:
        raise ValueError("Source of the startup code was not specified or multiple sources were specified.")
    clear_registered_items()  # Operation on the global object
    if startup_dir is not None:
        logger.info("Loading RE Worker startup code from directory '%s' ...", startup_dir)
        startup_dir = os.path.abspath(os.path.expanduser(startup_dir))
        logger.info("Startup directory: '%s'", startup_dir)
        nspace = load_profile_collection(startup_dir, patch_profiles=True, nspace=nspace)
    elif startup_module_name is not None:
        logger.info("Loading RE Worker startup code from module '%s' ...", startup_module_name)
        nspace = load_startup_module(startup_module_name, nspace=nspace)
    elif startup_script_path is not None:
        logger.info("Loading RE Worker startup code from script '%s' ...", startup_script_path)
        startup_script_path = os.path.abspath(os.path.expanduser(startup_script_path))
        nspace = load_startup_script(startup_script_path, nspace=nspace)
    else:
        logger.warning(
            "Source of startup information is not specified. RE Worker will be started with empty environment."
        )
        nspace = {}
    return nspace
def extract_script_root_path(*, startup_dir=None, startup_module_name=None, startup_script_path=None):
    """
    The function returns path to the root directory for local imports based on the source of
    startup files. The function returns ``None`` if root directory does not exist (e.g. when
    startup scripts are imported from the module). The root directory is a parameter of
    ``load_script_into_existing_nspace`` function. The startup code locations are checked
    in the same order as in ``load_worker_startup_code`` function.
    Parameters
    ----------
    startup_dir : str
        Name of the directory that contains startup files.
    startup_module_name : str
        Name of the startup module.
    startup_script_path : str
        Path to startup script.
    Returns
    -------
    str or None
        Path to the root directory of startup files.
    """
    if startup_dir is not None:
        return startup_dir
    elif startup_module_name is not None:
        return None
    elif startup_script_path is not None:
        return os.path.split(startup_script_path)[0]
    else:
        return None
def load_script_into_existing_nspace(
    *, script, nspace, enable_local_imports=True, script_root_path=None, update_re=False
):
    """
    Execute the script into the existing namespace ``nspace``. The script is expected to be executable
    in the current namespace. An exception is raised if execution of the script fails. The code
    calling this function is responsible for capturing exceptions and displaying or storing the error
    messages in the convenient form.
    Parameters
    ----------
    script: str
        Valid Python script, which is executable in the existing namespace.
    nspace: dict
        The existing namespace.
    enable_local_imports: boolean
        Enable/disable local imports when loading the script (``True/False``).
    script_root_path: str or None
        Root path (hypothetic location of the script) to use for local imports. The root directory
        is returned by the ``extract_script_root_path`` function.
    update_re: boolean
        Update the ``RE`` and ``db`` in ``nspace`` if they are changed in the script if ``True``,
        discard those objects otherwise.
    Returns
    -------
    None
    Raises
    ------
    Exceptions may be raised by the ``exec`` function.
    """
    # There is nothing to do if the script is an empty string or None
    if not script:
        return
    elif not isinstance(script, str):
        ScriptLoadingError(f"Script must be a string: (type(script)={type(script)!r})", "")
    importlib.invalidate_caches()
    root_path_exists = script_root_path and os.path.isdir(script_root_path)
    if enable_local_imports and not root_path_exists:
        logger.error("The path '%s' does not exist. Local imports will not work.", script_root_path)
    use_local_imports = enable_local_imports and root_path_exists
    if use_local_imports:
        sys.path.insert(0, script_root_path)  # Needed to make local imports work.
        # Save the list of available modules
        sm_keys = list(sys.modules.keys())
    object_backup = {}
    if not update_re:
        # Save references to RE and db in case they are changed by the script
        if "RE" in nspace:
            object_backup["RE"] = nspace["RE"]
        if "db" in nspace:
            object_backup["db"] = nspace["db"]
    # saved__file__ = None
    try:
        # Set '__name__' variables. NOTE: '__file__' variable is undefined (difference!!!)
        script_path = os.path.join(script_root_path, "script") if script_root_path else "script"
        # saved__file__ = nspace["__file__"] if "__file__" in nspace else None
        patch = f"__name__ = '__main__'\n__file__ = '{script_path}'\n"
        exec(patch, nspace, nspace)
        # A script may be partially loaded into the environment in case it fails.
        #   This is 'realistic' behavior, similar to what happens in IPython.
        exec(script, nspace, nspace)
    except BaseException as ex:
        # Capture traceback and send it as a message
        msg = f"Failed to execute stript: {ex}"
        ex_str = traceback.format_exception(*sys.exc_info())
        ex_str = "".join(ex_str) + "\n" + msg
        raise ScriptLoadingError(msg, ex_str) from ex
    finally:
        # if saved__file__ is None:
        #     patch = "del __file__\n"  # Do not delete '__name__'
        # else:
        #     # Restore the original value
        #     patch = f"__file__ = '{saved__file__}'"
        # exec(patch, nspace, nspace)
        if not update_re:
            # Remove references for 'RE' and 'db' from the namespace
            nspace.pop("RE", None)
            nspace.pop("db", None)
            # Restore the original reference to 'RE' and 'db' (if they existed)
            nspace.update(object_backup)
        if use_local_imports:
            # Delete data on all modules that were loaded by the script.
            # We don't need them anymore. Modules will be reloaded from disk if
            #   the script is executed again.
            for key in list(sys.modules.keys()):
                if key not in sm_keys:
                    print(f"Deleting the key '{key}'")
                    del sys.modules[key]
            sys.path.remove(script_root_path)
def is_plan(obj):
    """
    Returns ``True`` if the object is a plan.
    """
    try:
        # If available, use is_plan from bs utils to catch plans with new decorator.
        from bluesky.utils import is_plan as bs_utils_is_plan
        return bs_utils_is_plan(obj)
    except ImportError:
        return inspect.isgeneratorfunction(obj)
def is_device(obj):
    """
    Returns ``True`` if the object is a device.
    """
    from bluesky.protocols import Flyable, Readable
    # Detect ophyd-async devices, which are not readable nor flyable by checking existence of 'children' attribute
    return (isinstance(obj, (Readable, Flyable)) or hasattr(obj, "children")) and not inspect.isclass(obj)
def _process_registered_objects(*, nspace, reg_objs, validator, obj_type):
    """
    Attempt to get a reference for each registered object (plan or device).
    After processing, the parameter ``obj`` of each object will contain a valid reference
    to the object, unless the object is registered as excluded, the object does not exist in
    the namespace or not recognized as a plan or a device.
    Parameters
    ----------
    nspace: dict
        Worker namespace
    reg_objs: dict
        Reference to ``reg_ns_items.reg_plans`` or ``reg_ns_items.reg_devices``.
    validator: dict
        Function, which takes reference to the object and returns *True* if the object has
        valid type (reference to ``is_plan`` or ``is_device``).
    obj_type: str
        The name of the object type used in error messages. Expected to be ``plan`` or ``device``
    Returns
    -------
    None
    """
    for name, p in reg_objs.items():
        try:
            p["obj"] = None
            if p["exclude"]:
                continue
            obj = eval(name, nspace, nspace)
            if validator(obj):
                p["obj"] = obj
            else:
                logger.warning(f"Registered object {name!r} is not recognized as a {obj_type}.")
        except Exception as ex:
            logger.warning(f"Registered {obj_type} {name!r} is not found in the namespace: {ex}.")
def plans_from_nspace(nspace):
    """
    Extract plans from the namespace. Currently the function returns the dict of callable objects.
    Parameters
    ----------
    nspace: dict
        Namespace that may contain plans.
    Returns
    -------
    dict(str: callable)
        Dictionary of Bluesky plans
    """
    plans = {}
    for name, obj in nspace.items():
        if is_plan(obj):
            plans[name] = obj
    _process_registered_objects(nspace=nspace, reg_objs=reg_ns_items.reg_plans, validator=is_plan, obj_type="plan")
    return plans
def devices_from_nspace(nspace):
    """
    Extract signals and devices from the namespace. Currently the function returns the dict of
    namespace items of types inherited from ophyd.ophydobj.OphydObject objects.
    Parameters
    ----------
    nspace: dict
        Namespace that may contain plans and devices.
    Returns
    -------
    dict(str: callable)
        Dictionary that maps device names to device objects.
    """
    devices = {}
    for name, obj in nspace.items():
        if is_device(obj):
            devices[name] = obj
    _process_registered_objects(
        nspace=nspace, reg_objs=reg_ns_items.reg_devices, validator=is_device, obj_type="device"
    )
    return devices
def _get_nspace_object(object_name, *, objects_in_nspace, nspace=None):
    """
    Search for object in namespace by name (e.g. ``count`` or ``det1.val``).
    The function searches for subdevices by using ``component_name`` device property
    (plans have no ``component_name`` attribute). Returns a reference to
    the device object if it is found or device name (string) if the device is not found.
    Parameters
    ----------
    object_name: str
        Object (device or plan) name, such as ``count``, ``det1`` or ``det1.val``.
    objects_in_nspace: dict
        Dictionary of objects (devices and/or plans) in namespace.
    nspace: dict
        Reference to plan execution namespace. If ``nspace`` is not *None*, the up-to-date references
        to plans and devices are taken directly from the namespace. The plans and devices still must be
        present in ``plans_in_nspace`` and ``devices_in_nspace``, but those dictionaries may contain
        stale references (e.g. if a plan is interactively replaced by a user).
    Returns
    -------
    device: callable, ophyd.Object or str
        Reference to the object or object name (str) if the object is not found.
    """
    components, uses_re, _ = _split_name_pattern(object_name)
    if uses_re:
        raise ValueError(f"Object (device or plan) name {object_name!r} can not contain regular expressions")
    components = [_[0] for _ in components]  # We use only the 1st element
    if not components:
        raise ValueError(f"Device name {object_name!r} contains no components")
    object_found = True
    if components[0] in objects_in_nspace:
        if nspace:
            # This is actual reference to the object from execution namespace. It may be different from
            #   the reference saved during the last update of the lists.
            device = nspace[components[0]]
        else:
            device = objects_in_nspace[components[0]]
    else:
        object_found = False
    if object_found:
        for c in components[1:]:
            device = getattr(device, c, None)
            if device is None:
                object_found = False
                logger.error("Device '%s' does not have attribute (subdevice) '%s'", object_name, c)
            object_found = object_found if device else False
            if not object_found:
                break
    if not object_found:
        device = object_name
    return device
def prepare_plan(plan, *, plans_in_nspace, devices_in_nspace, allowed_plans, allowed_devices, nspace=None):
    """
    Prepare the plan for execution: replace the device names (str) in the plan specification
    by references to ophyd objects; replace plan name by the reference to the plan.
    Parameters
    ----------
    plan: dict
        Plan specification. Keys: `name` (str) - plan name, `args` - plan args,
        `kwargs` - plan kwargs. The plan parameters must contain ``user_group``.
    plans_in_nspace: dict(str, callable)
        Dictionary of existing plans from the RE workspace.
    devices_in_nspace: dict(str, ophyd.Device)
        Dictionary of existing devices from RE workspace.
    allowed_plans: dict(str, dict)
        Dictionary of allowed plans that maps group name to dictionary of plans allowed
        to the members of the group (group name can be found in ``plan["user_group"]``.
    allowed_devices: dict(str, dict)
        Dictionary of allowed devices that maps group name to dictionary of devices allowed
        to the members of the group.
    nspace: dict
        Reference to plan execution namespace. If ``nspace`` is not *None*, the up-to-date references
        to plans and devices are taken directly from the namespace. The plans and devices still must be
        present in ``plans_in_nspace`` and ``devices_in_nspace``, but those dictionaries may contain
        stale references (e.g. if a plan is interactively replaced by a user).
    Returns
    -------
    dict
        Parsed plan specification that contains references to plans and Ophyd devices.
    Raises
    ------
    RuntimeError
        Raised in case parsing was not successful.
    """
    success = True
    err_msg = ""
    plan_name = plan["name"]
    plan_args = plan["args"] if "args" in plan else []
    plan_kwargs = plan["kwargs"] if "kwargs" in plan else {}
    plan_meta = plan["meta"] if "meta" in plan else {}
    if "user_group" not in plan:
        raise RuntimeError(f"No user group is specified in parameters for the plan '{plan_name}'")
    user_group = plan["user_group"]  # User group is REQUIRED parameter of the plan
    if (user_group not in allowed_plans) or (user_group not in allowed_devices):
        raise RuntimeError(f"Lists of allowed plans and devices is not defined for the user group '{user_group}'")
    group_plans = allowed_plans[user_group]
    group_devices = allowed_devices[user_group]
    # Run full validation of parameters (same as during plan submission)
    success, errmsg = validate_plan(plan, allowed_plans=group_plans, allowed_devices=group_devices)
    if not success:
        raise RuntimeError(f"Validation of plan parameters failed: {errmsg}")
    def ref_from_name(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions):
        if isinstance(v, str):
            if (v in sel_object_names) or _is_object_name_in_list(v, allowed_objects=selected_object_tree):
                v = _get_nspace_object(v, objects_in_nspace=objects_in_nspace, nspace=nspace)
            elif eval_expressions and re.search(r"^[_A-Za-z][_A-Za-z0-9]*(\.[_A-Za-z][_A-Za-z0-9]*)*$", v):
                try:
                    v = eval(v, nspace, nspace)
                except Exception:
                    pass
        return v
    def process_argument(v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions):
        # Recursively process lists (iterables) and dictionaries
        if isinstance(v, str):
            v = ref_from_name(
                v, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions
            )
        elif isinstance(v, dict):
            for key, value in v.copy().items():
                v[key] = process_argument(
                    value, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_expressions
                )
        elif isinstance(v, Iterable):
            v_original = v
            v = list()
            for item in v_original:
                v.append(
                    process_argument(
                        item,
                        objects_in_nspace,
                        sel_object_names,
                        selected_object_tree,
                        nspace,
                        eval_expressions,
                    )
                )
        return v
    def process_parameter_value(value, pp, objects_in_nspace, group_plans, group_devices, nspace):
        """
        Process a parameter value ``value`` based on parameter description ``pp``
        (``pp = allowed_plans[<plan_name>][<param_name>]``).
        """
        # Include/exclude all devices/plans if required
        convert_all_plan_names = pp.get("convert_plan_names", None)  # None - not defined
        convert_all_device_names = pp.get("convert_device_names", None)
        eval_all_expressions = pp.get("eval_expressions", False)
        # A set of the selected device names (device names are listed explicitly, not as a tree)
        sel_object_names, pname_list, dname_list = set(), set(), set()
        if "annotation" not in pp:
            convert_all_plan_names = True if (convert_all_plan_names is None) else convert_all_plan_names
            convert_all_device_names = True if (convert_all_device_names is None) else convert_all_device_names
        elif ("plans" in pp["annotation"]) or ("devices" in pp["annotation"]):
            # 'Custom' annotation: attempt to convert strings only to the devices
            #    and plans that are part of the description.
            pname_list, dname_list = [], []
            for plist in pp["annotation"].get("plans", {}).values():
                plist = [_ for _ in plist if _is_object_name_in_list(_, allowed_objects=group_plans)]
                pname_list.extend(list(plist))
            for dlist in pp["annotation"].get("devices", {}).values():
                dlist = [_ for _ in dlist if _is_object_name_in_list(_, allowed_objects=group_devices)]
                dname_list.extend(list(dlist))
            pname_list, dname_list = set(pname_list), set(dname_list)
        # Create a tree of objects (dictionary of plans and devices)
        selected_object_tree = {}
        if convert_all_plan_names is True:
            selected_object_tree.update(group_plans)
        elif convert_all_plan_names is False:
            pname_list = set()
        if convert_all_device_names is True:
            selected_object_tree.update(group_devices)
        elif convert_all_device_names is False:
            dname_list = set()
        sel_object_names = pname_list.union(dname_list)
        if sel_object_names or selected_object_tree or eval_all_expressions:
            value = process_argument(
                value, objects_in_nspace, sel_object_names, selected_object_tree, nspace, eval_all_expressions
            )
        return value
    # Create the signature based on EXISTING plan from the workspace
    signature = inspect.signature(plans_in_nspace[plan_name])
    # Compare parameters in the signature and in the list of allowed plans. Make sure that the parameters
    #   in the list of allowed plans are a subset of the existing parameters (otherwise the plan can not
    #   be started). This not full validation.
    existing_names = set([_.name for _ in signature.parameters.values()])
    allowed_names = set([_["name"] for _ in group_plans[plan_name]["parameters"]])
    extra_names = allowed_names - existing_names
    if extra_names:
        raise RuntimeError(f"Plan description in the list of allowed plans has extra parameters {extra_names}")
    # Bind arguments of the plan
    bound_args = signature.bind(*plan_args, **plan_kwargs)
    # Separate dictionary for the default values define in the annotation decorator
    default_params = {}
    # Apply the default values defined in the annotation decorator. Default values defined in
    #   the decorator may still be converted to device references (e.g. str->ophyd.Device).
    for p in group_plans[plan_name]["parameters"]:
        if ("default" in p) and p.get("default_defined_in_decorator", False):
            if p["name"] not in bound_args.arguments:
                default_value = _process_default_value(p["default"])
                default_params.update({p["name"]: default_value})
    # Existing items include existing devices and existing plans
    items_in_nspace = devices_in_nspace.copy()
    items_in_nspace.update(plans_in_nspace)
    plan_func = process_argument(plan_name, plans_in_nspace, set(), group_plans, nspace, False)
    if isinstance(plan_func, str):
        success = False
        err_msg = f"Plan '{plan_name}' is not allowed or does not exist"
    # Map names to parameter descriptions
    param_descriptions = {_["name"]: _ for _ in group_plans[plan_name]["parameters"]}
    for p_name in bound_args.arguments:
        value = bound_args.arguments[p_name]
        # Fetch parameter description ({} if parameter is not found)
        pp = param_descriptions.get(p_name, {})
        bound_args.arguments[p_name] = process_parameter_value(
            value, pp, items_in_nspace, group_plans, group_devices, nspace
        )
    for p_name in default_params:
        value = default_params[p_name]
        # Fetch parameter description ({} if parameter is not found)
        pp = group_plans[plan_name].get(p_name, {})
        default_params[p_name] = process_parameter_value(
            value, pp, items_in_nspace, group_plans, group_devices, nspace
        )
    plan_args_parsed = list(bound_args.args)
    plan_kwargs_parsed = bound_args.kwargs
    plan_kwargs_parsed.update(default_params)
    # If metadata is a list of dictionaries, then merge the dictionaries into one
    #   with dictionaries with lower index having higher priority.
    success_meta = True
    if isinstance(plan_meta, (list, tuple)):
        p = {}
        for meta in reversed(plan_meta):
            if isinstance(meta, dict):
                p.update(meta)
            else:
                success_meta = False
        if success_meta:
            plan_meta = p
    elif not isinstance(plan_meta, dict):
        success_meta = False
    if not success_meta:
        success = False
        err_msg = f"Plan metadata must be a dictionary or a list of dictionaries: '{ppfl(plan_meta)}'"
    if not success:
        raise RuntimeError(f"Error while parsing the plan: {err_msg}")
    plan_prepared = {
        "callable": plan_func,
        "args": plan_args_parsed,
        "kwargs": plan_kwargs_parsed,
        "meta": plan_meta,
    }
    return plan_prepared
def prepare_function(*, func_info, nspace, user_group_permissions=None):
    """
    Prepare function for execution in the worker namespace. Returned dictionary
    contains reference to the callable object in the namespace, list of args and
    dictionary of kwargs. The function may check if the user is allowed to
    execute the function if ``user_group_permissions`` is provided (not ``None``).
    Parameters
    ----------
    func_info: dict
        Dictionary which contains keys: ``name``, ``user_group``, ``args`` (optional)
        and ``kwargs`` (optional).
    nspace: dict
        Reference to the RE Worker namespace.
    user_group_permissions: dict or None
        Dictionary that contains user group permissions. User permissions are not checked
        if ``None``.
    Returns
    -------
    dict
        Function data: ``callable`` is a reference to a callable object in RE Worker namespace,
        ``args`` is list of args and ``kwargs`` is a dictionary of kwargs.
    Raises
    ------
    RuntimeError
        Error while preparing the function.
    """
    if "name" not in func_info:
        raise RuntimeError(f"No function name is specified: {func_info!r}")
    if "user_group" not in func_info:
        raise RuntimeError(f"No user group is specified in parameters for the function: {func_info!r}")
    func_name = func_info["name"]
    user_group = func_info["user_group"]
    if user_group_permissions is not None:
        if not check_if_function_allowed(
            func_name, group_name=user_group, user_group_permissions=user_group_permissions
        ):
            raise RuntimeError(f"Function {func_name!r} is not allowed for users from {user_group!r} user group")
    func_args = list(func_info.get("args", []))
    func_kwargs = dict(func_info.get("kwargs", {}))
    if func_name not in nspace:
        raise RuntimeError(f"Function {func_name!r} is not found in the worker namespace")
    func_callable = nspace[func_name]
    # Following are basic checks to avoid most common errors. The list of check can be expanded.
    if not callable(func_callable):
        raise RuntimeError(f"Object {func_name!r} defined in worker namespace is not callable")
    # Generator functions defined in the worker namespace are plans, which return generators
    #   when called. There is no point in calling those functions directly.
    if inspect.isgeneratorfunction(func_callable):
        raise RuntimeError(f"Object {func_name!r} defined in worker namespace is a generator function")
    func_prepared = {
        "callable": func_callable,
        "args": func_args,
        "kwargs": func_kwargs,
    }
    return func_prepared
# ===============================================================================
#   Processing of plan annotations and plan descriptions
_supported_device_types = ("", "__READABLE__", "__FLYABLE__", "__DETECTOR__", "__MOTOR__")
def _split_name_pattern(name_pattern):
    """
    Split name pattern into components. The pattern may represent names of devices
    plans or functions. The patterns for device names may consist of multiple components,
    while the patterns from plan and function name always have one component.
    For devices, the list element may contain device or subdevice name (e.g. ``det1``,
    ``det1.val``, ``sim_stage.det.val``) or specify regular expressions used to find
    elements in the list of existing elements (e.g. ``:^det:^val$``, ``:-^det:^val$``,
    ``__DETECTORS__:-^sim_stage$:.+:^val$``). The ``-`` character immediately following
    ``:`` (``:-``) is not part of the regular expression. It indicates that all subdevices
    with matching regular expressions in this and previous pattern components are
    not included in the list but search continues to its subdevices. Optionally ``:``
    may be followed by ``+`` character, which indicates that the device must be included
    in the search result (this is the default behavior). The ``__DETECTORS__`` keyword
    indicates that only detectors that satisfy the conditions are included in the list.
    The following keywords are currently supported: ``__READABLE__``, ``__FLYABLE__``,
    ``__DETECTOR__`` and ``__MOTOR__``. The definition may also contain 'full name'
    patterns that are applyed to the remaining part of the subdevice name. The
    'full name' pattern is defined by putting '?' after ':', e.g. the pattern
    ``:?^det.*val$`` selects  all device and subdevice names that starts with ``det``
    and ends with ``val``, such as ``detector_val``, ``det.val``, ``det.somesubdevice.val``
    etc. The search depth may be limited by specifying ``depth`` parameter after
    the full name pattern, e.g. ``:?det.*val$:depth=5`` limits depth search to 5.
    Depth values starts with 1, e.g. in the example above, ``:?det.*val$:depth=1``
    will find only ``detector_val``. If 'full name' pattern is preceded with a number
    of subdevice name patterns, then 'full_name' pattern is applied to the remainder of
    the name, e.g. ``:^sim_stage$:?^det.*val$:depth=3`` would select ``sim_stage``
    and all its subdevices such as ``sim_stage.det5.val`` searching to the total depth
    of 4 (``sim_stage`` name is at level 1 and full name search is performed at levels
    2, 3 and 4).
    Note, that the last pattern component can not be deselected. For example ``:^det:-val$``
    is identical to ``:^det:val$``.
    The function may be applied to list elements describing plans, since they consist of
    the same set of components. The element may be a name, such as ``count`` or
    regular expression used to pick plans from the list of existing names, such as ``:^count``.
    Since plan names and patterns are much simpler, additional validation of the split
    results should be performed. The separators ``:+``, ``:-`` and ``:?`` can still be used
    with plan or function names, but they have no meaning and are ignored during processing.
    Parameters
    ----------
    name_pattern: str
        Name pattern of a device, plan or function.
    Returns
    -------
    components: list(tuple)
        List of tuples, each tuple contains the following items: regular expression or device/subdevice name,
        boolean flag that indicates if the matching is found at the current depth level should be included
        in the list (for regular expressions), boolean flag that indicates if regular expression should
        be applied to the full remaining part of the subdevice name (the full tree or subtree is going to
        be searched), depth (int, 1-based or ``None`` if not set) that limites depth of the tree searched
        for the name matching full name regular expressions.
    uses_re: boolean
        Boolean value that indicates if the components are regular expressions.
    device_type: str
        Contains the keyword that specifies device type (for regular expressions). Empty string if
        the device type is not specified or if the components are device/subdevice names.
    Raises
    ------
    TypeError
        The element definition is not a string.
    ValueError
        The element definition is incorrectly formatted and can not be processed.
    """
    if not isinstance(name_pattern, str):
        raise TypeError(
            f"Name pattern {name_pattern!r} has incorrect type {type(name_pattern)!r}. Expected type: 'str'"
        )
    # Remove spaces
    name_pattern = name_pattern.replace(" ", "")
    if not len(name_pattern):
        raise ValueError(f"Name pattern {name_pattern!r} is an empty string")
    # Check if the element is defined using regular expressions
    uses_re = ":" in name_pattern
    device_type = ""
    if uses_re:
        components = name_pattern.split(":")
        device_type = components[:1][0]  # The first element is a string (may be empty string)
        components = components[1:]
        # Find the components starting with '?'
        n_full_re = None
        for n, c in enumerate(components):
            if c.startswith("?"):
                n_full_re = n
                break
        depth = None
        if n_full_re is not None:
            # 'Full name' RE can be the last or next to last component.
            if n_full_re < len(components) - 2:
                raise ValueError(
                    f"Full name regular expression {components[n_full_re]!r} must be the last: {name_pattern!r}"
                )
            elif n_full_re == len(components) - 2:
                # If 'full name' RE is next to last, it can be followed only by 'depth' specification
                if not components[-1].startswith("depth="):
                    raise ValueError(
                        f"Full name regular expression {components[n_full_re]!r} can be only followed by "
                        f"the depth specification: {name_pattern!r}"
                    )
                elif not re.search("^depth=[0-9]+$", components[-1]):
                    raise ValueError(
                        f"Depth specification {components[-1]!r} has incorrect format: {name_pattern!r}"
                    )
                else:
                    _, depth = components.pop().split("=")  # Remove depth specification
                    depth = int(depth)
        if (depth is not None) and (depth < 1):
            raise ValueError(f"Depth ({depth}) must be positive integer greater or equal to 1: {name_pattern!r}")
        if device_type not in _supported_device_types:
            raise ValueError(
                f"Device type {device_type!r} is not supported. Supported types: {_supported_device_types}"
            )
        components_include = [False if _.startswith("-") else True for _ in components]
        components_include[-1] = True  # Always set the last component as included.
        components_full_re = [False] * len(components)
        components_depth = [None] * len(components)  # 'depth == None' means that depth is not specified
        if n_full_re is not None:
            components_full_re[n_full_re] = True
            components_depth[n_full_re] = depth
        # Remove '+' and '?' from the beginning of each component
        components = [
            _[1:] if _.startswith("+") or _.startswith("-") or _.startswith("?") else _ for _ in components
        ]
        for c in components:
            if not c:
                raise ValueError(f"Name pattern {name_pattern!r} contains empty components")
            try:
                re.compile(c)
            except re.error:
                raise ValueError(f"Name pattern {name_pattern!r} contains invalid regular expression {c!r}")
        components = list(zip(components, components_include, components_full_re, components_depth))
    else:
        if not re.search(r"^[_a-zA-Z]([_a-zA-Z0-9\.]*[_a-zA-Z0-9])?$", name_pattern):
            raise ValueError(
                f"Name pattern {name_pattern!r} contains invalid characters. "
                "The pattern could be a valid regular expression, but it is not labeled with ':' (e.g. ':^det$')"
            )
        components = name_pattern.split(".")
        for c in components:
            if not c:
                raise ValueError(
                    f"Plan, device or subdevice name in the description {name_pattern!r} is an empty string"
                )
        components_include = [True] * len(components)
        components_full_re = [False] * len(components)
        components_depth = [None] * len(components)
        components = list(zip(components, components_include, components_full_re, components_depth))
    return components, uses_re, device_type
def _is_object_name_in_list(object_name, *, allowed_objects):
    """
    Search for object (plan or device) name (e.g. ``count`` or ``det1.val``) in the tree
    of device names (list of allowed devices) and returns ``True`` if the device is in
    the list and ``False`` otherwise. If search for a device name n the tree is terminated
    prematurely or at the node which has optional ``excluded`` attribute set ``True``, then
    it is considered that the name is not in the list.
    Parameters
    ----------
    object_name: str
        Object name, such as ``count``, ``det1`` or ``det1.val``.
    allowed_objects: dict
        Dictionary that contains descriptions of allowed objects (plans and/or devices)
        for a user group.
    Returns
    -------
    bool
        ``True`` if device was found in the list, ``False`` if the device is not in the list or
        the ``object_name`` is not a valid name for the object.
    """
    try:
        components, uses_re, _ = _split_name_pattern(object_name)
        object_in_list = True
    except (TypeError, ValueError):
        object_in_list = False
    if object_in_list and uses_re:
        object_in_list = False
    if object_in_list:
        components = [_[0] for _ in components]  # We use only the 1st element
        if not components:
            object_in_list = False
    # The object name is valid, so search for the object in the list
    if object_in_list:
        root = None
        if components[0] in allowed_objects:
            root = allowed_objects[components[0]]
            object_in_list = not root.get("excluded", False)
        else:
            object_in_list = False
        if root:
            for c in components[1:]:
                if ("components" in root) and (c in root["components"]):
                    root = root["components"][c]
                    object_in_list = not root.get("excluded", False)
                else:
                    object_in_list = False
                    break
    return object_in_list
def _get_device_type_condition(device_type):
    """
    Generator function that returns reference to a function that verifies if device
    satisfy condition for the selecte device type.
    Parameters
    ----------
    device_type: str
        Deivce type
    Returns
    -------
    callable
    """
    # The condition is applied to the device to decide if it is included in the list
    if device_type == "":
        def condition(_btype):
            return True
    elif device_type == "__READABLE__":
        def condition(_btype):
            return bool(_btype["is_readable"])
    elif device_type == "__DETECTOR__":
        def condition(_btype):
            return _btype["is_readable"] and not _btype["is_movable"]
    elif device_type == "__MOTOR__":
        def condition(_btype):
            return _btype["is_readable"] and _btype["is_movable"]
    elif device_type == "__FLYABLE__":
        def condition(_btype):
            return bool(_btype["is_flyable"])
    else:
        raise ValueError(f"Unsupported device type: {device_type!r}. Supported types: {_supported_device_types}")
    return condition
def _build_device_name_list(*, components, uses_re, device_type, existing_devices):
    """
    Generate list of device names (including subdevices) based on one element of a device
    list from plan parameter annotation. The element is preprocessed with
    ``_split_list_element_definition()`` represented as ``components``, ``uses_re`` and
    ``device_type`` parameters. If the element is a device name, it is simply included
    in the list. If the element is a pattern, the matching devices are selected
    from ``existing_devices`` dictionary and added to the list. The ``components`` is
    a list of tuples for each depth level. The first element of the tuple represents
    a pattern device/subdevice name at the given level. If matching devices are found,
    they are added to the list of devices if the second tuple element is ``True`` and then
    the subdevices of each matching device are processed using the pattern defined for
    the next depth level (the second element may be ``True`` or ``False``).
    A pattern may also be a 'full name' regular expression applied to the remainder
    of the subdevice (if 3rd element of the tuple is True). Depth for 'full name' search
    can be restricted (4th component of the tuple). Depth may be integer (>=1) or ``None``
    (depth is not define, therefore considered infinite).
    The devices with set parameter ``'excluded': True`` are not included in the list.
    It is assumed that element definitions are first processed using
    ``_split_list_element_definition``, which catches and processes all the errors,
    so this function does not perform any validation of data integrity.
    Parameters
    ----------
    components: list(tuple)
        List of tuples, each tuple consists of four elements: device/subdevice name or
        pattern for name search; a boolean flag that tells if the matching devices or
        subdevices at this level should be added to the device list; a boolean flag
        that tells if the pattern should be applied to the remainder of the device name
        (full name search); integer value that limits the depth of full name search
        (``None`` if the depth is not limited).
    uses_re: boolean
        Boolean parameter that tells if the components represent device name or pattern based
        on regular expressions.
    device_type: str
        If components define the pattern, then device type is used to limit search to the devices
        of certain type.
    existing_devices: dict
        The list of existing (or allowed) devices.
    Returns
    -------
    list(str)
        List of device names.
    """
    condition = _get_device_type_condition(device_type)
    if uses_re:
        max_depth, device_list = len(components), []
        def process_devices_full_name(devices, base_name, name_suffix, pattern, depth, depth_stop):
            if (depth_stop is None) or (depth < depth_stop):
                for dname, dparams in devices.items():
                    name_suffix_new = dname if not name_suffix else (name_suffix + "." + dname)
                    full_name = name_suffix_new if not base_name else (base_name + "." + name_suffix_new)
                    if (
                        re.search(pattern, name_suffix_new)
                        and not dparams.get("excluded", False)
                        and condition(dparams)
                    ):
                        device_list.append(full_name)
                    if "components" in dparams:
                        process_devices_full_name(
                            devices=dparams["components"],
                            base_name=base_name,
                            name_suffix=name_suffix_new,
                            pattern=pattern,
                            depth=depth + 1,
                            depth_stop=depth_stop,
                        )
        def process_devices(devices, base_name, depth):
            if (depth < max_depth) and devices:
                pattern, include_devices, is_full_re, remaining_depth = components[depth]
                if is_full_re:
                    process_devices_full_name(
                        devices=devices,
                        base_name=base_name,
                        name_suffix="",
                        pattern=pattern,
                        depth=depth,
                        depth_stop=depth + remaining_depth if remaining_depth else None,
                    )
                else:
                    for dname, dparams in devices.items():
                        if re.search(pattern, dname):
                            full_name = dname if not base_name else (base_name + "." + dname)
                            if include_devices and not dparams.get("excluded", False) and condition(dparams):
                                device_list.append(full_name)
                            if "components" in dparams:
                                process_devices(
                                    devices=dparams["components"], base_name=full_name, depth=depth + 1
                                )
        process_devices(devices=existing_devices, base_name="", depth=0)
    else:
        # Simplified approach: just copy the device name to list. No checks are performed.
        # Change the code in this block if device names are to be handled differently.
        device_name = ".".join([_[0] for _ in components])
        device_list = [device_name]
    return device_list
def _filter_device_tree(item_dict, allow_patterns, disallow_patterns):
    """
    Filter device tree based on sets of patterns for 'allowed' and 'disallowed'
    devices.
    Parameters
    ----------
    item_dict: dict
        Dictionary that represents a tree of existing devices. Some of the devices
        may be labeled as excluded: those devices will remain excluded and could be
        removed from the tree.
    allowed_patterns: list(str)
        List of patterns from ``allowed_devices`` section of user group permissions.
        If the parameter value is ``[]`` or the first list element is ``None``,
        then it is assumed that all the devices are allowed.
    disallow_patterns: list(str)
        List of patterns from ``forbidden_devices`` section of user group permissions.
        If the parameter value is ``[]`` or the first list element is ``None``,
        then it is assumed that no devices are forbidden.
    Returns
    -------
    allowed_devices: dict
        Dictionary that represents a tree of devices after filtering. Excluded
        devices could be labeled as excluded (parameter ``"exclude": True``).
        Branches that consist of excluded parameters are removed from the tree.
    Raises
    ------
        May raise exceptions if provided patterns are invalid and/or filtering
        can not be completed.
    """
    # Add temporary flag ('excl': False) to each device
    def add_excl_flag(devices_dict, exclude_all):
        """
        Add temporary ``excl`` parameter to each device and subdevice.
        Parameters
        ----------
        devices_dict: dict
            Dictionary that contains the device tree
        exclude_all: boolean
            The value for the ``excl`` parameter.
        """
        for _, dparam in devices_dict.items():
            dparam["excl"] = exclude_all
            if "components" in dparam:
                add_excl_flag(dparam["components"], exclude_all)
    def apply_pattern_filter(*, pattern, devices_dict, exclude):
        """
        Apply a pattern to the tree of deivce. Pattern can be an explicitly state name
        such as ``"det"`` or ``"det.val"`` or regular expression, such as ``":^det:val$"``.
        Parameters
        ----------
        pattern_item: str
            Device/subdevice name or regular expression to be applied to the device names.
        existing_devices: dict
            Dictionary that represents a tree of devices/subdevices.
        exclude: boolean
            True - exclude devices, False - include devices.
        Returns
        -------
        None
        """
        components, uses_re, device_type = _split_name_pattern(pattern)
        if uses_re:
            condition = _get_device_type_condition(device_type)
            max_depth = len(components)
            def apply_filter_full_name(devices, name_suffix, pattern, depth, depth_stop, *, exclude):
                if (depth_stop is None) or (depth < depth_stop):
                    for dname, dparams in devices.items():
                        name_suffix_new = dname if not name_suffix else (name_suffix + "." + dname)
                        if re.search(pattern, name_suffix_new) and condition(dparams):
                            dparams["excl"] = exclude
                        if "components" in dparams:
                            apply_filter_full_name(
                                devices=dparams["components"],
                                name_suffix=name_suffix_new,
                                pattern=pattern,
                                depth=depth + 1,
                                depth_stop=depth_stop,
                                exclude=exclude,
                            )
            def apply_filter(devices, depth, *, exclude):
                if (depth < max_depth) and devices:
                    pattern, include_devices, is_full_re, remaining_depth = components[depth]
                    if is_full_re:
                        apply_filter_full_name(
                            devices=devices,
                            name_suffix="",
                            pattern=pattern,
                            depth=depth,
                            depth_stop=depth + remaining_depth if remaining_depth else None,
                            exclude=exclude,
                        )
                    else:
                        for dname, dparams in devices.items():
                            if re.search(pattern, dname):
                                if include_devices and condition(dparams):
                                    dparams["excl"] = bool(exclude)
                                if "components" in dparams:
                                    apply_filter(devices=dparams["components"], depth=depth + 1, exclude=exclude)
            apply_filter(devices=devices_dict, depth=0, exclude=exclude)
        else:
            # 'components' represent an explicitly stated name
            root = allowed_devices
            for n, c in enumerate([_[0] for _ in components]):
                if c in root:
                    if n == len(components) - 1:
                        root[c]["excl"] = exclude
                    if "components" in root[c]:
                        root = root[c]["components"]
                    else:
                        break
                else:
                    break
    def trim_device_dict(devices_dict):
        """
        Remove ``excl`` parameter from each device. Remove branches that contain only
        the devices that were excluded. Add ``"excluded": True`` to the devices that
        were excluded but can not be removed from the tree (``"excl": True``).
        Parameters
        ----------
        devices_dict: dict
            Dictionary representing the tree of devices.
        """
        contains_devices = False
        for dname in list(devices_dict.keys()):
            dparam = devices_dict[dname]
            exclude_this_device = True
            if "components" in dparam:
                if trim_device_dict(dparam["components"]):
                    exclude_this_device = False
                else:
                    del dparam["components"]
            if dparam.get("excluded", False) or (dparam["excl"] is True):
                dparam["excluded"] = True
            else:
                exclude_this_device = False
            if exclude_this_device:
                # Device is excluded and contains no subdevices that were not excluded.
                del devices_dict[dname]
            else:
                del dparam["excl"]
                contains_devices = True
        return contains_devices
    allowed_devices = copy.deepcopy(item_dict)
    # Process the case when all devices/subdevices are allowed
    allow_all = len(allow_patterns) and (allow_patterns[0] is None)
    add_excl_flag(allowed_devices, exclude_all=not allow_all)
    # Process 'allow_patterns'
    if not allow_all:
        for pattern in allow_patterns:
            apply_pattern_filter(pattern=pattern, devices_dict=allowed_devices, exclude=False)
    # Process 'disallow_patterns'
    if len(disallow_patterns) and (disallow_patterns[0] is not None):
        for pattern in disallow_patterns:
            apply_pattern_filter(pattern=pattern, devices_dict=allowed_devices, exclude=True)
    trim_device_dict(allowed_devices)
    return allowed_devices
def _build_plan_name_list(*, components, uses_re, device_type, existing_plans):
    """
    Parameters
    ----------
    components: list(tuple)
        List of tuples returned by ``_split_list_element_definition()``. For the plans
        the list must contain exactly one component, elements 2 and 3 of each tuple
        must be ``False`` and element 4 must be ``None``. Exception will be raised if those
        requirements are not satisfied (this is part of normal operation, it means that
        the definition contains elements, such as ``+`` or ``?`` that are not supported
        for plans, so the user should be informed that the definition can not be processed)
    uses_re: boolean
        Boolean parameter that tells if the components represent device name or pattern based
        on regular expressions.
    device_type: str
        This parameter is returned by ``_split_list_element_definition()`` and must be
        an empty string ``""``. Exception will be raised otherwise (device type can not be
        used in the pattern defining a plan).
    existing_plans: set or dict
        A set of plan names or a list (dictionary) of existing (or allowed) plans.
    Returns
    -------
    list(str)
        List of device names.
    """
    if len(components) != 1:
        c = [_[0] for _ in components]
        raise ValueError(f"List element describing a plan may contain only one component. Components: {c}")
    # At this point we know that 'components' has precisely one element.
    pattern, include_devices, is_full_re, remaining_depth = components[0]
    if remaining_depth is not None:
        raise ValueError(f"Depth specification can not be part of the name pattern for a plan: {pattern!r}")
    if device_type:
        raise ValueError(
            f"Device type can not be included in the name pattern for a plan: {(device_type + ':')!r}"
        )
    plan_list = []
    if uses_re:
        for plan_name in existing_plans:
            if re.search(pattern, plan_name):
                plan_list.append(plan_name)
    else:
        # Simplified approach: just add the plan name to the list. No checks are performed.
        # Change the code in this block if device names are to be handled differently.
        plan_list = [pattern]
    return plan_list
def _expand_parameter_annotation(annotation, *, existing_devices, existing_plans):
    """
    Expand the lists if plans and device in annotation. The function does not filter
    the lists: all names of plans and devices that are explicitly defined in the lists
    are copied to the new lists. Plans and devices that are defined using regular
    expressions are searched in the lists of existing plans and devices respectively.
    Parameters
    ----------
    annotation: dict
        Parameter annotation.
    existing_devices: dict, None
        Dictionary with allowed or existing devices. If ``None``, then all the expanded lists
        will be empty.
    existing_plans: set, dict or None
        A set of existng plan names or a dictionary where the keys are the existing plan names.
        The function does not require complete plan descriptions, since they are typically not
        available on this stage. If ``None``, then all the expanded lists will be empty.
    Returns
    -------
    dict
        Expanded annotation. Always returns deep copy, so the original annotation remains unchanged
    """
    existing_devices = existing_devices or {}
    existing_plans = existing_plans or {}
    annotation = copy.deepcopy(annotation)
    for section in ("devices", "plans"):
        if section not in annotation:
            continue
        for k, v in annotation[section].items():
            if not isinstance(v, (list, tuple)):
                raise ValueError(f"Unsupported type of a device or plan list {k!r}: {v}")
            item_list = []
            for d in v:
                components, uses_re, device_type = _split_name_pattern(d)
                if section == "plans":
                    name_list = _build_plan_name_list(
                        components=components,
                        uses_re=uses_re,
                        device_type=device_type,
                        existing_plans=existing_plans,
                    )
                else:
                    name_list = _build_device_name_list(
                        components=components,
                        uses_re=uses_re,
                        device_type=device_type,
                        existing_devices=existing_devices,
                    )
                item_list.extend(name_list)
            # Remove repeated entries
            item_list = list(set(item_list))
            # Sort the list of names by lower case letters
            item_list = sorted(item_list, key=lambda _: (_.upper(), _[0].islower()))
            annotation[section][k] = item_list
    # Process enums: convert tuples to lists
    if "enums" in annotation:
        an_enums = annotation["enums"]
        for k in an_enums.keys():
            an_enums[k] = list(an_enums[k])
    return annotation
def _expand_plan_description(plan_description, *, existing_devices, existing_plans):
    """
    Expand lists of items for custom enum types for each parameter in the plan description.
    After expansion, plan description is ready for use in plan validation. The function
    does not change the plan description if there are no lists to expand, therefore it can
    be repeatedly applied to the same plan description without consequences (except extra
    execution time). The function always returnes a COPY of the plan description, so
    it is safe to apply to elements of the list of allowed/existing plans without changing
    the original descriptions.
    Parameters
    ----------
    plan_description: dict
        Plan description. The format of plan description is the same as in the list of
        allowed/existing plans. The function will not change the original dictionary,
        which is referenced by this parameter.
    existing_devices: dict, None
        Dictionary with allowed or existing devices. If ``None``, then all the expanded lists
        will be empty.
    existing_plans: set, dict or None
        A set of existng plan names or a dictionary where the keys are the existing plan names.
        The function does not require complete plan descriptions, since they are typically not
        available on this stage. If ``None``, then all the expanded lists will be empty.
    Returns
    -------
    dict
        Expanded plan description, which contains full lists of items. Always returns the
        copy of the plan description.
    Raises
    ------
        Exceptions are raised if plan description can not be properly processed.
    """
    plan_description = copy.deepcopy(plan_description)
    if "parameters" in plan_description:
        for p in plan_description["parameters"]:
            if "annotation" in p:
                p["annotation"] = _expand_parameter_annotation(
                    p["annotation"], existing_devices=existing_devices, existing_plans=existing_plans
                )
    return plan_description
# ===============================================================================
#   Validation of plan parameters
def _convert_str_to_number(value_in):
    """
    Interpret a string ``value_in`` as ``int`` or ``float`` (whichever is sufficient).
    Raise the exception if conversion fails.
    """
    try:
        # Try to convert string value to float
        if isinstance(value_in, str):
            value_out = float(value_in)
        else:
            raise ValueError(f"Value '{value_in}' is not a string (type: {type(value_in)})")
        if int(value_out) == value_out:
            value_out = int(value_out)
    except Exception as ex:
        raise ValueError(f"Failed to interpret the value {value_in!r} as integer or float number: {ex}")
    return value_out
def _compare_types(object_in, object_out):
    """
    Compares types of parameters. If both parameters are iterables (list or tuple),
    then the function is recursively called for each element. A set of rules is
    applied to types to determine if the types are matching.
    """
    def both_iterables(x, y):
        return isinstance(x, Iterable) and isinstance(y, Iterable) and not isinstance(x, str)
    def both_mappings(x, y):
        return isinstance(x, Mapping) and isinstance(y, Mapping)
    if both_iterables(object_in, object_out):
        match = True
        for o_in, o_out in zip(object_in, object_out):
            if not _compare_types(o_in, o_out):
                match = False
    elif both_mappings(object_in, object_out):
        match = True
        for k in object_in.keys():
            if k not in object_out:
                match = False
            if not _compare_types(object_in[k], object_out[k]):
                match = False
    else:
        match = False
        # This is the set of rules used to determine if types are matching.
        #   Types are required to be identical, except in the following cases:
        if isinstance(object_out, type(object_in)):
            match = True
        # 'int' is allowed to be converted to float
        if isinstance(object_out, float) and isinstance(object_in, int):
            match = True
        if pydantic_version_major == 1 and isinstance(object_in, (int, float)) and isinstance(object_out, str):
            match = True
        # Conversion to Enum is valid (parameters often contain values, that are converted to Enums).
        elif issubclass(type(object_out), enum.Enum) and object_in == object_out.value:
            match = True
    return match
def _compare_in_out(kwargs_in, kwargs_out):
    """
    Compares input and output kwargs for ``pydantic`` model. Accumulates and returns
    the error message that includes the data on all parameters with mismatching types.
    """
    match, msg = True, ""
    for k, v in kwargs_out.items():
        if k in kwargs_in:
            valid = _compare_types(kwargs_in[k], v)
            if not valid:
                msg = f"{msg}\n" if msg else msg
                msg += (
                    f"Incorrect parameter type: key='{k}', "
                    f"value='{kwargs_in[k]}' ({type(kwargs_in[k])}), "
                    f"interpreted as '{v}' ({type(v)})"
                )
                match = False
    return match, msg
def _check_ranges(kwargs_in, param_list):
    """
    Check if each parameter in ``kwargs_in`` is within the range limited by
    the minimum and maximum values if the range is specified in plan parameter
    descriptions.
    """
    def process_argument(v, v_min, v_max):
        queue = [v]
        while queue:
            v_cur = queue.pop(0)
            if isinstance(v_cur, numbers.Number):
                out_of_range = False
                if (v_min is not None) and (v_cur < v_min):
                    out_of_range = True
                if (v_max is not None) and (v_cur > v_max):
                    out_of_range = True
                if out_of_range:
                    s_v_min = f"{v_min}" if v_min is not None else "-inf"
                    s_v_max = f"{v_max}" if v_max is not None else "inf"
                    raise ValueError(f"Value {v_cur} is out of range [{s_v_min}, {s_v_max}]")
            elif isinstance(v_cur, dict):
                queue.extend(v_cur.values())
            elif isinstance(v_cur, Iterable) and not isinstance(v_cur, str):
                queue.extend(v_cur)
    match, msg = True, ""
    for p in param_list:
        k = p["name"]
        v_min = p.get("min", None)
        v_max = p.get("max", None)
        # Perform check only if the parameter value is passed to the plan and
        #   v_max or v_min is specified.
        if (k in kwargs_in) and (v_min is not None or v_max is not None):
            v = kwargs_in[k]
            if v_min is not None:
                v_min = _convert_str_to_number(v_min)
            if v_max is not None:
                v_max = _convert_str_to_number(v_max)
            try:
                process_argument(v, v_min, v_max)
            except Exception as ex:
                msg = f"{msg}\n" if msg else msg
                msg += f"Parameter value is out of range: key='{k}', " f"value='{kwargs_in[k]}': {ex}"
                match = False
    return match, msg
def _find_and_replace_built_in_types(annotation_type_str, *, plans=None, devices=None, enums=None):
    """
    Find and replace built-in types in parameter annotation type string. Built-in types are
    ``__PLAN__``, ``__DEVICE__``, ``__PLAN_OR_DEVICE__``, ``__READABLE__``, ``__MOVABLE__``,
    ``__FLYABLE__``, ``__CALLABLE__``.
    Since plan and device names are passed as strings and then converted to respective object,
    the built-in plan names are replaced by ``str`` type, which is then used in parameter validation.
    The function also determines whether plans, devices or both plans and devices need to be converted.
    If the name of the built-in type is redefined in ``plans``, ``devices`` or ``enums`` section,
    then the keyword is ignored.
    Parameters
    ----------
    annotation_type_str: str
        String that represents a type from parameter annotation
    plans: dict or None
        Reference to the ``plans`` section of the parameter annotation
    devices: dict or None
        Reference to the ``devices`` section of the parameter annotation
    enums: dict or None
        Reference to the ``enum`` section of the parameter annotation
    Returns
    -------
    annotation_type_str: str
        Modified parameter type
    convert_values: dict
        Dictionary that contains information on how the parameter values should be processed.
        Keys: ``convert_plan_names``, ``convert_device_names``, ``eval_expressions``.
    """
    # Built-in types that may be contained in 'annotation_type_str' and
    #   should be converted to another known type
    plans = plans or {}
    devices = devices or {}
    enums = enums or {}
    built_in_types = {
        "__PLAN__": ("str", True, False, False),
        "__DEVICE__": ("str", False, True, False),
        "__PLAN_OR_DEVICE__": ("str", True, True, False),
        "__READABLE__": ("str", False, True, False),
        "__MOVABLE__": ("str", False, True, False),
        "__FLYABLE__": ("str", False, True, False),
        "__CALLABLE__": ("str", False, False, True),
    }
    convert_plan_names = False
    convert_device_names = False
    eval_expressions = False
    for btype, (btype_replace, pl, dev, ev_expr) in built_in_types.items():
        # If 'plans', 'devices' or 'enums' contains the type name, then leave it as is
        if (btype in plans) or (btype in devices) or (btype in enums):
            continue
        if re.search(btype, annotation_type_str):
            annotation_type_str = re.sub(btype, btype_replace, annotation_type_str)
            convert_plan_names = convert_plan_names or pl
            convert_device_names = convert_device_names or dev
            eval_expressions = eval_expressions or ev_expr
    convert_values = dict(
        convert_plan_names=convert_plan_names,
        convert_device_names=convert_device_names,
        eval_expressions=eval_expressions,
    )
    return annotation_type_str, convert_values
def _process_annotation(encoded_annotation, *, ns=None):
    """
    Processed annotation is encoded the same way as in the descriptions of existing plans.
    Returns reference to the annotation (type object) and the list of temporary types
    that needs to be deleted once the processing (parameter validation) is complete.
    The built-in type names (__DEVICE__, __PLAN__, __PLAN_OR_DEVICE__, ``__READABLE__``,
    ``__MOVABLE__``, ``__FLYABLE__``, ``__CALLABLE__``) are replaced with ``str`` unless types with
    the same name are defined in ``plans``, ``devices`` or ``enums`` sections of the annotation.
    Explicitly defined the types with the same names as built-in types are treated as regular types.
    The function validates annotation and raises exceptions in the following cases:
    the types defined in 'plans', 'devices' or 'enums' sections are not lists or tuples;
    the types defined in 'plans', 'devices' or 'enums' sections are not used as part
    of the parameter type (may indicate a bug in the user code); annotation contains
    unsupported keys.
    This function does not change the annotation.
    Parameters
    ----------
    encoded_annotation : dict
        Dictionary that contains encoded annotation. The dictionary may contain the following
        keys: ``type`` - type represented as a string, ``devices`` - a dictionary that maps
        device types and lists of device names, ``plans`` - similar dictionary for plan types
        and ``enums`` - similar dictionary for simple enums (strings).
    ns : dict
        Namespace that is used to evaluate the annotations. Custom types with unique names
        may be added to the namesace by the function.
    Returns
    -------
    annotation_type: type
        Type reconstructed from annotation.
    convert_values: dict
        Dictionary that contains information on how the parameter values should be processed.
        Keys: ``convert_plan_names``, ``convert_device_names``, ``eval_expressions``.
    ns: dict
        Namespace dictionary with created types.
    """
    import bluesky
    # Namespace that contains the types created in the process of processing the annotation
    ns = ns or {}
    if "typing" not in ns:
        ns.update({"typing": typing})
    if "collections" not in ns:
        ns.update({"collections": collections})
    if "bluesky" not in ns:
        ns.update({"bluesky": bluesky})
    if "enum" not in ns:
        ns.update({"enum": enum})
    if "NoneType" not in ns:
        ns.update({"NoneType": type(None)})
    annotation_type_str = "<not-specified>"
    annotation_type = None
    try:
        if "type" not in encoded_annotation:
            raise ValueError("Type is not specififed")
        # Verify that the annotation does not have any unsupported keys
        allowed_keys = ("type", "devices", "plans", "enums")
        invalid_keys = []
        for k in encoded_annotation:
            if k not in allowed_keys:
                invalid_keys.append(k)
        if invalid_keys:
            raise ValueError(
                f"Annotation contains unsupported keys: {invalid_keys}. Supported keys: {allowed_keys}"
            )
        annotation_type_str = encoded_annotation.get("type")  # Required!
        devices = encoded_annotation.get("devices", {})
        plans = encoded_annotation.get("plans", {})
        enums = encoded_annotation.get("enums", {})
        items = devices.copy()
        items.update(plans)
        items.update(enums)
        annotation_type_str, convert_values = _find_and_replace_built_in_types(
            annotation_type_str, plans=plans, devices=devices, enums=enums
        )
        # Create types
        for item_name in items:
            # The dictionary value for the devices/plans may be a list(tuple) of items.
            if not isinstance(items[item_name], (list, tuple)):
                raise TypeError(
                    f"The list of items ({item_name!r}: {items[item_name]!r}) must be a list of a tuple."
                )
            if item_name not in annotation_type_str:
                raise ValueError(
                    f"Type {item_name!r} is defined in the annotation, but not used in the parameter type"
                )
            # Create temporary unique type name
            type_name = type_name_base = item_name
            while True:
                # Try the original name first
                if type_name not in ns:
                    break
                type_name = f"{type_name_base}_{random.randint(0, 100000)}_"
            # Replace all occurrences of the type name in the custom annotation.
            annotation_type_str = annotation_type_str.replace(item_name, type_name)
            # Create temporary type as a subclass of enum.Enum, e.g.
            # enum.Enum('Device', {'det1': 'det1', 'det2': 'det2', 'det3': 'det3'})
            type_code = f"enum.Enum('{type_name}', {{"
            for d in items[item_name]:
                type_code += f"'{d}': '{d}',"
            type_code += "})"
            ns[type_name] = eval(type_code, ns, ns)
        # Once all the types are created,  execute the code for annotation.
        annotation_type = eval(annotation_type_str, ns, ns)
    except Exception as ex:
        raise TypeError(f"Failed to process annotation '{annotation_type_str}': {ex}'")
    return annotation_type, convert_values, ns
def _process_default_value(encoded_default_value):
    """
    Evaluates the default value represented as a string.
    Parameters
    ----------
    encoded_default_value : str
        Default value represented as a string (e.g. "10" or "'some-str'"
    Returns
    -------
    p_default
        Default value.
    Raises
    ------
    ValueError
        Raised if evaluation of the string containing the default value fails
    """
    try:
        p_default = ast.literal_eval(encoded_default_value)
    except Exception as ex:
        raise ValueError(f"Failed to decode the default value '{encoded_default_value}': {ex}")
    return p_default
def _decode_parameter_types_and_defaults(param_list):
    """
    Decode parameter types and default values by using string representations of
    types and defaults stored in list of available devices.
    Parameters
    ----------
    parameters : list(dict)
        List of dictionaries that contains annotations and default values of parameters.
        Used keys of the dictionaries: ``annotation`` and ``default``. If ``annotation``
        key is missing, then the the type is set to ``typing.Any``. If ``default`` is
        missing, then the default value is set to ``inspect.Parameter.empty``.
    Returns
    -------
    dict
        Mapping of parameter names to the dictionaries containing type (``type`` key)
        and default value (``default`` key).
    """
    decoded_types_and_defaults = {}
    for p in param_list:
        if "name" not in p:
            raise KeyError(f"No 'name' key in the parameter description {p}")
        if "annotation" in p:
            p_type, _, _ = _process_annotation(p["annotation"])
        else:
            p_type = typing.Any
        if "default" in p:
            p_default = _process_default_value(p["default"])
        else:
            p_default = inspect.Parameter.empty
        decoded_types_and_defaults[p["name"]] = {"type": p_type, "default": p_default}
    return decoded_types_and_defaults
def construct_parameters(param_list, *, params_decoded=None):
    """
    Construct the list of ``inspect.Parameter`` parameters based on the parameter list.
    Parameters
    ----------
    param_list : list(dict)
        List of item (plan) parameters as it is stored in the list of existing plans.
        Used keys of each parameter dictionary: ``name``, ``kind``, ``default``,
        ``annotation``.
    params_decoded : dict or None
        Dictionary that maps parameter name to the decoded parameter type
        and default value. The dictionary is created by calling
        ``_decode_parameter_types_and_defaults()``.  If the value is ``None``,
        then the decoded parameters are created automatically from the list ``param_list``.
        The parameter is intended for cases when the instantiated values are used repeatedly
        to avoid unnecessary multiple calls to ``_decode_parameter_types_and_defaults()``.
    Returns
    -------
    parameters : inspect.Parameters
        Item (plan) parameters represented in the form accepted by ``inspect.Signature``:
        ``sig=inspect.Signature(parameters)``.
    Raises
    ------
    Exceptions with meaningful error messages are generated if parameters can not be
    instantiated.
    """
    if params_decoded is None:
        params_decoded = _decode_parameter_types_and_defaults(param_list)
    parameters = []
    try:
        for p in param_list:
            p_name = p.get("name", None)
            p_kind = p.get("kind", {}).get("value", None)
            if p_name is None:
                raise ValueError(f"Description for parameter contains no key 'name': {p}")
            if p_kind is None:
                raise ValueError(f"Description for parameter contains no key 'kind': {p}")
            # The following values are ALWAYS created by '_instantiate_parameter_types_and_defaults'
            p_type = params_decoded[p_name]["type"]
            p_default = params_decoded[p_name]["default"]
            param = inspect.Parameter(name=p_name, kind=p_kind, default=p_default, annotation=p_type)
            parameters.append(param)
    except Exception as ex:
        raise ValueError(f"Failed to construct 'inspect.Parameters': {ex}")
    return parameters
def pydantic_construct_model_class(parameters):
    """
    Construct the 'pydantic' model based on parameters.
    Parameters
    ----------
    parameters : inspect.Parameters
        Parameters of the plan with annotations created by ``_create_parameters`` function.
    Returns
    -------
    class
        Dynamically created Pydantic model class
    """
    model_kwargs = {}
    for p in parameters:
        if hasattr(p, "default") and p.default != inspect.Parameter.empty:
            default = p.default
        else:
            if p.kind == p.VAR_POSITIONAL:
                default = []
            elif p.kind == p.VAR_KEYWORD:
                default = {}
            else:
                default = ...
        if hasattr(p, "annotation"):
            if p.kind == p.VAR_POSITIONAL:
                annotation = typing.List[p.annotation]
            elif p.kind == p.VAR_KEYWORD:
                annotation = typing.Dict[str, p.annotation]
            else:
                annotation = p.annotation
        else:
            annotation = None
        if annotation is not None:
            model_kwargs[p.name] = (annotation, default)
        else:
            model_kwargs[p.name] = default
    return pydantic.create_model("Model", **model_kwargs)
def pydantic_validate_model(kwargs, pydantic_model_class):
    """
    Validate the function parameters using 'pydantic' model based
    (by instantiating the model using model class and bound model parameters).
    Parameters
    ----------
    kwargs: dict
        Bound parameters of the function call.
    pydantic_model_class : class
        Pydantic model class (created using ``pydantic_construct_model_class()``).
    Returns
    -------
    class
        Constructed Pydantic model class
    """
    # Verify the arguments by instantiating the 'pydantic' model
    return pydantic_model_class(**kwargs)
def _validate_plan_parameters(param_list, call_args, call_kwargs):
    """
    Validate plan parameters based on parameter annotations.
    Parameters
    ----------
    param_list: dict
        The list of parameters with annotations (including custom annotations if available).
    call_args: list
        'args' of the plan
    call_kwargs: dict
        'kwargs' of the plan
    Returns
    -------
    dict
        Bound arguments of the called plan.
    Raises
    ------
    Exception
        Exception is raised if the plan is invalid. The exception message contains
        information on the error.
    """
    try:
        # Reconstruct 'inspect.Parameter' parameters based on the parameter list
        parameters = construct_parameters(param_list)
        # Create signature based on the list of parameters
        sig = inspect.Signature(parameters)
        # Verify the list of parameters based on signature.
        bound_args = sig.bind(*call_args, **call_kwargs)
        pydantic_model_class = pydantic_construct_model_class(parameters)
        m = pydantic_validate_model(bound_args.arguments, pydantic_model_class)
        # The next step: match types of the output value of the pydantic model with input
        #   types. 'Pydantic' is converting types whenever possible, but we need precise
        #   type checking with a fiew exceptions
        # NOTE: in the next statement we are using 'm.__dict__' instead of officially
        #   recommended 'm.dict()', because 'm.dict()' was causing performance problems
        #   when validating large batches of plans. Based on testing, 'm.__dict__' seems
        #   to work fine in this application.
        # NOTE: Pydantic 2 performs strict type checking and some checks performed by
        #   '_compare_in_out' may not be needed.
        success, msg = _compare_in_out(bound_args.arguments, m.__dict__)
        if not success:
            raise ValueError(f"Error in argument types: {msg}")
        # Finally check the ranges of parameters that have min. and max. are defined
        success, msg = _check_ranges(bound_args.arguments, param_list)
        if not success:
            raise ValueError(f"Argument values are out of range: {msg}")
    except Exception:
        raise
    return bound_args.arguments
def filter_plan_description(plan_description, *, allowed_plans, allowed_devices):
    """
    Filter plan description from the list of existing plans (modify parameters
    ``plan["annotation"]["plans"]`` and ``plan["annotation"]["devices"]``)
    to include only plans and devices from the lists of allowed plans and allowed devices
    for the given group. Creates and returns a modified copy of the plan.
    Parameters
    ----------
    plan_description: dict
        The dictionary containing plan description from the list of existing plans
    allowed_plans: dict or None
        The dictionary with allowed plans: key - plan name. If None, then
        all plans are allowed (may change in the future). If ``{}`` then no
        plans are allowed.
    allowed_devices: dict or None
        The dictionary with allowed devices: key - device name. If None, then
        all devices are allowed (may change in the future). If ``{}`` then no
        devices are allowed.
    Returns
    -------
    dict
        The dictionary with modified plan description.
    """
    plan_description = copy.deepcopy(plan_description)
    if "parameters" in plan_description:
        param_list = plan_description["parameters"]
        # Filter 'devices' and 'plans' entries of 'param_list'. Leave only plans that are
        #   in 'allowed_plans' and devices that are in 'allowed_devices'
        for p in param_list:
            if "annotation" in p:
                if (allowed_plans is not None) and ("plans" in p["annotation"]):
                    p_plans = p["annotation"]["plans"]
                    for p_type in p_plans:
                        p_plans[p_type] = [
                            _ for _ in p_plans[p_type] if _is_object_name_in_list(_, allowed_objects=allowed_plans)
                        ]
                if (allowed_devices is not None) and ("devices" in p["annotation"]):
                    p_dev = p["annotation"]["devices"]
                    for p_type in p_dev:
                        p_dev[p_type] = [
                            _ for _ in p_dev[p_type] if _is_object_name_in_list(_, allowed_objects=allowed_devices)
                        ]
    return plan_description
def _filter_allowed_plans(*, allowed_plans, allowed_devices):
    """
    Apply ``filter_plan_description`` to each plan in the list of allowed plans so that
    only the allowed plans and devices are left in the enums for ``plans`` and ``devices``.
    Parameters
    ----------
    allowed_plans: dict or None
        The dictionary with allowed plans: key - plan name. If None, then
        all plans are allowed (may change in the future). If ``{}`` then no
        plans are allowed.
    allowed_devices: dict or None
        The dictionary with allowed devices: key - device name. If None, then
        all devices are allowed (may change in the future). If ``{}`` then no
        devices are allowed.
    Returns
    -------
    dict
       The modified copy of ``allowed_plans``.
    """
    return {
        k: filter_plan_description(v, allowed_plans=allowed_plans, allowed_devices=allowed_devices)
        for k, v in allowed_plans.items()
    }
[docs]
def validate_plan(plan, *, allowed_plans, allowed_devices):
    """
    Validate the dictionary of plan parameters. The function is called before the plan
    is added to the queue. The function can also be called by the client application
    to validate a plan before it is submitted to the queue. The dictionaries
    ``allowed_plans`` and ``allowed_devices`` must contain plans and devices
    that are allowed for the user and can be downloaded from the server.
    Parameters
    ----------
    plan: dict
        The dictionary of plan parameters
    allowed_plans: dict or None
        The dictionary with allowed plans: key - plan name. If None, then
        all plans are allowed (may change in the future). If ``{}`` then no
        plans are allowed.
    allowed_devices: dict or None
        The dictionary with allowed devices: key - device name. If None, then
        all devices are allowed (may change in the future). If ``{}`` then no
        devices are allowed.
    Returns
    -------
    boolean
        Indicates if validation was successful (``True``) or failed (``False``).
    str
        Error message that explains the reason for validation failure. Empty string
        if validation is successful.
    """
    try:
        success, msg = True, ""
        # If there is no list of allowed plans (it is None), then consider the plan valid.
        if allowed_plans is not None:
            # Verify that plan name is in the list of allowed plans
            plan_name = plan["name"]
            if plan_name not in allowed_plans:
                msg = f"Plan '{plan['name']}' is not in the list of allowed plans."
                raise Exception(msg)
            plan_description = allowed_plans[plan_name]
            # Plan description should contain only allowed plans and devices as parameters at
            #   this point. But run the filtering again just in case.
            plan_description = filter_plan_description(
                plan_description, allowed_plans=allowed_plans, allowed_devices=allowed_devices
            )
            param_list = plan_description["parameters"]
            call_args = plan.get("args", {})
            call_kwargs = plan.get("kwargs", {})
            _validate_plan_parameters(param_list=param_list, call_args=call_args, call_kwargs=call_kwargs)
        # Check if supplied plan metadata is a dictionary or a list of dictionaries.
        if "meta" in plan:
            meta_msg = "Plan parameter 'meta' must be a dictionary or a list of dictionaries"
            if isinstance(plan["meta"], (tuple, list)):
                for meta in plan["meta"]:
                    if not isinstance(meta, dict):
                        raise Exception(meta_msg)
            elif not isinstance(plan["meta"], dict):
                raise Exception(meta_msg)
    except Exception as ex:
        success = False
        msg = f"Plan validation failed: {str(ex)}\nPlan: {ppfl(plan)}"
    return success, msg 
def bind_plan_arguments(*, plan_args, plan_kwargs, plan_parameters):
    """
    Bind plan arguments given as ``plan_args`` and ``plan_kwargs`` to plan parameters.
    The function contains shortened version of code executed in ``validate_plan`` function.
    The dictionary of bound arguments can be accessed as ``bound_args.arguments``.
    Parameters
    ----------
    plan_args : list
        A list containing plan args
    plan_kwargs : dict
        A dictionary containing plan kwargs
    plan_parameters : dict
        A dictionary containing description of plan signature. The dictionary is the entry
        of ``allowed_plans`` dictionary (e.g. ``allowed_plans['count']``)
    Returns
    -------
    bound_args : inspect.BoundArgument
        Bound arguments of the plan.
    Raises
    ------
    TypeError
        Arguments could not be bound to plan parameters (raised by ``inspect.Signature.bind``.
    """
    param_list = copy.deepcopy(plan_parameters["parameters"])
    parameters = construct_parameters(param_list)
    # Create signature based on the list of parameters
    sig = inspect.Signature(parameters)
    # Verify the list of parameters based on signature.
    bound_args = sig.bind(*plan_args, **plan_kwargs)
    return bound_args
def _parse_docstring(docstring):
    """
    Parse docstring of a function using ``numpydoc``.
    Parameters
    ----------
    docstring: str or None
        Docstring to be parsed.
    Returns
    -------
    dict
        Dictionary that contains the extracted data. Returns ``{}`` if ``docstring`` is ``None``.
    """
    doc_annotation = {}
    if docstring:
        # Make sure that the first line of the docstring is properly indented, otherwise it is
        #   incorrectly parsed by 'numpydoc' (new line is optional).
        # Find the minimum
        ds_split = docstring.split("\n")
        ds_split = ds_split[1:]  # Ignore the first line
        n_indent = None
        for s in ds_split:
            if s.strip():  # Don't process empty strings (that contain only spaces)
                n = None
                for i, ch in enumerate(s):
                    if ch != " ":
                        n = i
                        break
                if n is not None:
                    n_indent = n if n_indent is None else min(n, n_indent)
        if n_indent is not None:
            docstring = "\n" + " " * n_indent + docstring.strip() + "\n"
        doc = NumpyDocString(docstring)
        summary = doc["Summary"]
        if summary:
            doc_annotation["description"] = "\n".join(summary)
        params = doc["Parameters"]
        doc_annotation["parameters"] = {}
        for p in params:
            names = p.name
            desc = p.desc
            if ":" in names:
                n = names.index(":")
                names = names[:n]
            names = names.split(",")
            # Remove '*' (should not be part of the name, but used in some plan annotations)
            names = [_.replace("*", "") for _ in names]
            for name in names:
                name = name.strip()
                if name:
                    doc_annotation["parameters"][name] = {}
                    doc_annotation["parameters"][name]["description"] = "\n".join(desc)
        def params_to_str(params):
            """
            Assembles return type info into one string. This joins the description
            of return parameters into one string.
            """
            p_list = []
            for p in params:
                s = ""
                indent = 0
                if p.name:
                    s += f"{p.name}"
                if p.name and p.type:
                    s += " : "
                if p.type:
                    s += f"{p.type}"
                if (p.name or p.type) and p.desc:
                    s += "\n"
                    indent = 4
                if p.desc:
                    lines = p.desc
                    lines = [" " * indent + _ if _.strip() else _ for _ in lines]
                    s += "\n".join(lines)
                p_list.append(s)
            return "\n".join(p_list)
        returns = doc["Returns"]
        yields = doc["Yields"]
        doc_annotation["returns"] = {}
        p_str = None
        if yields:
            p_str = params_to_str(yields)
        elif returns:
            p_str = params_to_str(returns)
        if p_str:
            doc_annotation["returns"]["description"] = p_str
    return doc_annotation
def _process_plan(plan, *, existing_devices, existing_plans):
    """
    Returns parameters of a plan. The function accepts callable object that implements the plan
    and returns the dictionary with descriptions of the plan and plan parameters. The function
    extracts the parameters and their annotations from the plan signature and plan and parameter
    descriptions from the docstring. If the plan is decorated with``parameter_annotation_decorator``,
    then the specifications of plan and parameter descriptions and parameter annotations passed
    to the decorator override the descriptions from the docstring and Python parameter annotations
    from the function header.
    Limitation on the support types of the plan parameters and the default values that could be
    used in function signatures. Parameter types must be native Python types (such as ``int``,
    ``float``, ``str``, etc.) or generic types based on Python native types
    (``typing.List[typing.Union[int, str]]``). The operation of recreating the type from its
    string representation using ``eval`` and the namespace with imported ``typing`` module
    and ``NoneType`` type should be successful. The default values are reconstructed using
    ``ast.literal_eval`` and the operation ``ast.literal_eval(f"{v!r}")`` must run successfully.
    The description of the plan is represented as a Python dictionary in the following format
    (all elements are optional unless they are labeled as REQUIRED):
    .. code-block:: python
        {
            "name": <plan name>,  # REQUIRED
            "module": <module name>,  # whenever available
            "description": <plan description (multiline text)>,
            "properties": {  # REQUIRED, may be empty
                "is_generator": <boolean that indicates if this is a generator function>
            },
            "parameters": [  # REQUIRED, may be empty
                <param_name_1>: {
                     "name": <parameter name>,  # REQUIRED
                     "description": <parameter description>,
                     # For values of the 'kind' see documentation for 'inspect.Parameter'
                     "kind": {  # REQUIRED
                         "name": <string representation>,
                         "value": <integer representation>,
                     },
                     "annotation": {
                         # 'type' is REQUIRED if annotation is present
                         "type": <parameter type represented as a string>,
                         # Enums for devices, plans and simple enums are in the same format as
                         #   the 'parameter_annotation_decorator'.
                         "devices": <dict of device types (lists of device names)>,
                         "plans": <dict of plan types (lists of plan names)>,
                         "enums": <dict of enum types (lists of strings)>,
                     }
                     "default": <string representation of the default value>,
                     "default_defined_in_decorator": boolean,  # True if the default value is defined
                                                              # in decorator, otherwise False/not set
                     "min": <string representing int or float>,
                     "max": <string representing int or float>,
                     "step": <string representing int or float>,
                }
                <parameter_name_2>: ...
                <parameter_name_3>: ...
                ...
            ]
        }
    .. note::
        This function does not expand built-in lists of devices or add subdevices for devices with
        specified length if those are used in the plan annotation decorator. Call ``expand_plan_description()``
        for each plan description in order for the lists to be expanded based on the current list of
        available or existing devices.
    Parameters
    ----------
    plan: callable
        Reference to the function implementing the plan
    existing_devices : dict
        Prepared dictionary of existing devices (returned by the function ``_prepare_devices``.
        The dictionary is used to create lists of devices for built-in device lists ``AllDevicesList``,
        ``AllDetectorsList``, ``AllMotorsList``, ``AllFlyersList``. If it is known that built-in plans
        are not used, then empty dictionary can be passed instead of the list of existing devices.
    Returns
    -------
    dict
        Dictionary with plan parameters.
    Raises
    ------
    ValueError
        Error occurred while creating plan description.
    """
    def _get_full_type_name(patterns, s):
        """
        Returns the full name of the type with parameters. The function tries to find the name
        of the type in the string ``s`` using the list of regular expressions ``patterns``.
        For example, if the list of patterns is ``["typing.Callable", "collections.abc.Callable"]``
        the function will find both isolated ``typing.Callable`` and ``collections.abc.Callable``
        and the types with parameters such as ``typing.Callable[[int, str], float]``, whichever
        comes first in the string ``s``.
        Parameters
        ----------
        patterns : list
            List of regular expressions to find the type name in the string ``s``.
        s : str
            String to search for the type name.
        Returns
        -------
        str or None
            Full name of the type with parameters or ``None`` if the type name was not found.
        """
        if not s:
            return None
        # Try all patterns. Use the first pattern that matches.
        matches = [re.search(p, s) for p in patterns]
        match = None
        for m in matches:
            if m:
                if not match or m.start() < match.start():
                    match = m
        # No patterns were found.
        if not match:
            return None
        name = match[0]
        start_ind, end_name_ind = match.span()
        # The type has no parameters
        if end_name_ind == len(s) or s[end_name_ind] != "[":
            return name
        # Find full name with parameters (find the closing bracket)
        end_ind, n_brackets = -1, 0
        for n in range(end_name_ind, len(s)):
            if s[n] == "[":
                n_brackets += 1
            elif s[n] == "]":
                n_brackets -= 1
                if n_brackets == 0:
                    end_ind = n + 1
                    break
        if end_ind < 0:
            return name
        return s[start_ind:end_ind]
    def convert_annotation_to_string(annotation):
        """
        Ignore the type if it can not be properly reconstructed using 'eval'
        """
        import bluesky
        from bluesky.protocols import (
            Checkable,
            Configurable,
            Flyable,
            Locatable,
            Movable,
            Pausable,
            Readable,
            Stageable,
            Stoppable,
            Subscribable,
            Triggerable,
        )
        # TODO: remove this check once NameMovable becomes standard
        if hasattr(bluesky.protocols, "NamedMovable"):
            from bluesky.protocols import NamedMovable
        else:
            NamedMovable = Movable
        protocols_mapping = {
            "__READABLE__": [Readable],
            "__MOVABLE__": [Movable, NamedMovable],
            "__FLYABLE__": [Flyable],
            "__DEVICE__": [
                Configurable,
                Triggerable,
                Locatable,
                Stageable,
                Pausable,
                Stoppable,
                Subscribable,
                Checkable,
            ],
        }
        protocols_inv = {_: k for k, v in protocols_mapping.items() for _ in v}
        protocols_patterns = {
            k: [f"bluesky.protocols.{_.__name__}" for _ in v] for k, v in protocols_mapping.items()
        }
        callables_patterns = {"__CALLABLE__": [r"typing.Callable", r"collections.abc.Callable"]}
        # Patterns for Callable MUST go first
        type_patterns = {**callables_patterns, **protocols_patterns}
        ns = {
            "typing": typing,
            "collections": collections,
            "bluesky": bluesky,
            "NoneType": type(None),
        }
        substitutions_dict = {}
        # This will work for generic types like 'typing.List[int]'
        a_str = f"{annotation!r}"
        # The following takes care of Python base types, such as 'int', 'float'
        #   expressed as "<class 'int'>", but have valid '__name__' attribute
        #   containing type name, such as "int".
        if hasattr(annotation, "__name__") and re.search("^<.*>$", a_str):
            # Note, that starting with Python 3.10 parameter annotation always has
            #   '__name__', which is meaningless for types other than base types.
            _ = annotation.__name__
            mapping = {k.__name__: v for k, v in protocols_inv.items()}
            if _ in mapping:
                a_str = mapping[_]
                ns[a_str] = annotation  # 'a_str' is __DEVICE__, __READABLE__ or similar
            else:
                if re.search("<class '.+'>", a_str):
                    a_str = re.sub(r"<class '(.+)'>", r"\1", a_str)
        else:
            # Replace each expression with a unique string in the form of '__CALLABLE<n>__'
            n_patterns = 0  # Number of detected callables
            for type_name, type_patterns in type_patterns.items():
                while True:
                    pattern = _get_full_type_name(type_patterns, a_str)
                    if not pattern:
                        break
                    try:
                        p_type = eval(pattern, ns, ns)
                    except Exception:
                        p_type = None
                    p_str_prefix, p_str_suffix = re.findall(r".*[a-zA-Z0-9]|__$", type_name)
                    p_str = f"{p_str_prefix}{n_patterns + 1}{p_str_suffix}"
                    a_str = re.sub(re.escape(pattern), p_str, a_str)
                    if p_type:
                        # Evaluation of the annotation will fail later and the parameter
                        #   will have no type annotation
                        ns[p_str] = p_type
                    substitutions_dict[p_str] = type_name
                    n_patterns += 1
        # Verify if the type could be recreated by evaluating the string during validation.
        try:
            an = eval(a_str, ns, ns)
            if an != annotation:
                raise Exception()
            # Replace all callables ('__CALLABLE1__', '__CALLABLE2__', etc.) with '__CALLABLE__' string
            for k, v in substitutions_dict.items():
                a_str = re.sub(k, v, a_str)
            # Some types can not be validated by Pydantic, but they could be substituted by other types that can
            patches = {
                "collections.abc.Iterable": "collections.abc.Iterable[typing.Any]",
            }
            template_prefix = r"(?<![._A-Za-z0-9])"
            template_suffix = r"(?![._\[A-Za-z0-9])"
            for k, v in patches.items():
                a_str = re.sub(f"{template_prefix}{k}{template_suffix}", v, a_str)
        except Exception:
            # Ignore the type if it can not be recreated.
            a_str = None
        return a_str
    def convert_expression_to_string(value, expression_role=None):
        """
        Raise 'ValueError' if the string representation of the value can not be evaluated
        """
        role_str = f" of {expression_role}" if expression_role else ""
        s_value = f"{value!r}"
        try:
            ast.literal_eval(s_value)
        except Exception:
            # If default value can not be accepted, then the plan is considered invalid.
            #    Processing should be interrupted.
            raise ValueError(
                f"The expression ({s_value}) can not be evaluated with 'ast.literal_eval()': "
                f"unsupported type{role_str}."
            )
        return s_value
    def assemble_custom_annotation(parameter, *, existing_plans, existing_devices):
        """
        Assemble annotation from parameters of the decorator.
        Returns ``None`` if there is no annotation.
        """
        annotation = {}
        if "annotation" not in parameter:
            return None
        annotation["type"] = parameter["annotation"]
        keys = {"devices", "plans", "enums"}
        for k in keys:
            if k in parameter:
                annotation[k] = copy.copy(parameter[k])
        annotation = _expand_parameter_annotation(
            annotation, existing_devices=existing_devices, existing_plans=existing_plans
        )
        return annotation
    sig = inspect.signature(plan)
    docstring = getattr(plan, "__doc__", None)
    param_annotation = getattr(plan, "_custom_parameter_annotation_", None)
    doc_annotation = _parse_docstring(docstring)
    # Returned value: dictionary with fully processed plan description
    ret = {"name": plan.__name__, "properties": {}, "parameters": []}
    module_name = plan.__module__
    if module_name != "<run_path>":
        ret.update({"module": module_name})
    try:
        # Properties
        ret["properties"]["is_generator"] = inspect.isgeneratorfunction(plan) or inspect.isgenerator(plan)
        # Plan description
        desc = None
        if param_annotation:
            desc = param_annotation.get("description", None)
        if not desc and doc_annotation:
            desc = doc_annotation.get("description", None)
        if desc:
            ret["description"] = desc
        # Function parameters
        use_docstring = "parameters" in doc_annotation
        use_custom = param_annotation and ("parameters" in param_annotation)
        for p in sig.parameters.values():
            working_dict = {"name": p.name}
            ret["parameters"].append(working_dict)
            working_dict["kind"] = {"name": p.kind.name, "value": p.kind.value}
            # Parameter description (attempt to get it from the decorator, then from docstring)
            desc, annotation, default = None, None, None
            vmin, vmax, step = None, None, None
            default_defined_in_decorator = False
            if use_custom and (p.name in param_annotation["parameters"]):
                desc = param_annotation["parameters"][p.name].get("description", None)
                # Copy 'convert_plan_names' and 'convert_device_names' if they exist
                for k in ("convert_plan_names", "convert_device_names"):
                    if k in param_annotation["parameters"][p.name]:
                        working_dict[k] = param_annotation["parameters"][p.name][k]
                annotation = assemble_custom_annotation(
                    param_annotation["parameters"][p.name],
                    existing_plans=existing_plans,
                    existing_devices=existing_devices,
                )
                try:
                    _ = param_annotation["parameters"][p.name].get("default", None)
                    if _ is None:
                        default = _
                    else:
                        default = convert_expression_to_string(_, expression_role="default value in decorator")
                    if default:
                        default_defined_in_decorator = True
                    _ = param_annotation["parameters"][p.name].get("min", None)
                    if _ is None:
                        vmin = _
                    else:
                        vmin = convert_expression_to_string(_, expression_role="min value in decorator")
                    _ = param_annotation["parameters"][p.name].get("max", None)
                    if _ is None:
                        vmax = _
                    else:
                        vmax = convert_expression_to_string(_, expression_role="max value in decorator")
                    _ = param_annotation["parameters"][p.name].get("step", None)
                    if _ is None:
                        step = _
                    else:
                        step = convert_expression_to_string(_, expression_role="step value in decorator")
                except Exception as ex:
                    raise ValueError(f"Parameter '{p.name}': {ex}")
            if not desc and use_docstring and (p.name in doc_annotation["parameters"]):
                desc = doc_annotation["parameters"][p.name].get("description", None)
            if not annotation and p.annotation is not inspect.Parameter.empty:
                annotation = convert_annotation_to_string(p.annotation)
                if annotation:
                    # The case when annotation does exist (otherwise it is None)
                    annotation = {"type": annotation}
            if default and p.default is inspect.Parameter.empty:
                # The default value in the decorator overrides the default value in the header,
                #   not replace it. Therefore the default value is required in the header if
                #   one is specified in the decorator.
                raise ValueError(
                    f"Missing default value for the parameter '{p.name}' in the plan signature: "
                    f"The default value {default} is specified in the annotation decorator, "
                    f"there for a default value is required in the plan header."
                )
            if not default and (p.default is not inspect.Parameter.empty):
                try:
                    default = convert_expression_to_string(p.default, expression_role="default value")
                except Exception as ex:
                    raise ValueError(f"Parameter '{p.name}': {ex}")
            if desc:
                working_dict["description"] = desc
            if annotation:
                # Verify that the encoded type could be decoded (raises an exception if fails)
                _, convert_values, _ = _process_annotation(annotation)
                # _, convert_plan_names, convert_device_names, _ = _process_annotation(annotation)
                working_dict["annotation"] = annotation
                # Set the following parameters True only if they do not already exist (ignore if False)
                if convert_values["convert_plan_names"] and ("convert_plan_names" not in working_dict):
                    working_dict["convert_plan_names"] = True
                if convert_values["convert_device_names"] and ("convert_device_names" not in working_dict):
                    working_dict["convert_device_names"] = True
                if convert_values["eval_expressions"] and ("eval_expressions" not in working_dict):
                    working_dict["eval_expressions"] = True
            if default:
                # Verify that the encoded representation of the default can be decoded.
                _process_default_value(default)  # May raises exception
                working_dict["default"] = default
                if default_defined_in_decorator:
                    working_dict["default_defined_in_decorator"] = True
            # 'min', 'max' and 'step' (may exist only in decorator)
            if vmin is not None:
                try:
                    # Attempt to convert to number (use the same function as during validation)
                    vmin = _convert_str_to_number(vmin)
                    working_dict["min"] = f"{vmin}"  # Save as a string
                except Exception as ex:
                    raise ValueError(f"Failed to process min. value: {ex}")
            if vmax is not None:
                try:
                    vmax = _convert_str_to_number(vmax)
                    working_dict["max"] = f"{vmax}"
                except Exception as ex:
                    raise ValueError(f"Failed to process max. value: {ex}")
            if step is not None:
                try:
                    step = _convert_str_to_number(step)
                    working_dict["step"] = f"{step}"
                except Exception as ex:
                    raise ValueError(f"Failed to process step value: {ex}")
    except Exception as ex:
        raise ValueError(f"Failed to create description of plan '{plan.__name__}': {ex}") from ex
    return ret
def _prepare_plans(plans, *, existing_devices, ignore_invalid_plans=False):
    """
    Prepare dictionary of existing plans for saving to YAML file.
    Parameters
    ----------
    plans: dict
        Dictionary of plans extracted from the workspace
    existing_devices: dict
        Prepared dictionary of existing devices (returned by the function ``_prepare_devices``.
    ignore_invalid_plans: bool
        Ignore plans with unsupported signatures. If the argument is ``False`` (default), then
        an exception is raised otherwise a message is printed and the plan is not included in the list.
    Returns
    -------
    dict
        Dictionary that maps plan names to plan descriptions
    """
    reg_plans = reg_ns_items.reg_plans
    plans_selected = {}
    for name in set(plans.keys()).union(set(reg_plans.keys())):
        obj = None
        if name in reg_plans:
            obj = reg_plans[name]["obj"]
        elif name in plans:
            obj = plans[name]
        if obj:
            plans_selected[name] = obj
    plans_names = set(plans_selected)
    prepared_plans = {}
    for k, v in plans_selected.items():
        try:
            prepared_plans[k] = _process_plan(v, existing_devices=existing_devices, existing_plans=plans_names)
        except Exception as ex:
            if ignore_invalid_plans:
                logger.warning("PLAN WAS IGNORED: %s", ex)
            else:
                raise
    return prepared_plans
def _prepare_devices(devices, *, max_depth=0, ignore_all_subdevices_if_one_fails=True, expand_areadetectors=False):
    """
    Prepare dictionary of existing devices for saving to YAML file.
    ``max_depth`` is the maximum depth for the components. The default value (50)
    is a very large number.
    Parameters
    ----------
    devices: dict
        Dictionary of devices from the namespace (key - device name, value - reference
        to the device object).
    max_depth: int
        Maximum depth for the device search: 0 - infinite depth, 1 - only top level
        devices, 2 - device and subdevices etc.
    ignore_all_subdevices_if_one_fails: bool
        Ignore all components of devices if at least one component (PVs) can not
        be accessed. It saves a lot of time to ignore all components, since
        stale code may contain devices with many components with non-existing PVs
        and respective timeout may amount to substantial waiting time.
    expand_areadetectors: bool
        Find subdevices of areadetectors. It may take significant time to expand an
        areadetector and it is unlikely that the areadetector subdevices should be
        accessed via plan parameters.
    Returns
    -------
    dict
        List of existing devices (tree of devices and subdevices).
    """
    max_depth = max(0, max_depth)  # must be >= 0
    from bluesky import protocols
    from ophyd.areadetector import ADBase
    def get_device_params(device):
        movable_protocols = (protocols.Movable,)
        # TODO: remove this check when NamedMovable is available in every Bluesky deployment
        # !!! Checking for NamedMovable involves checking for 'hints' attribute, which tends to
        # !!! instantiate objects (at least on Python 3.10 and 3.11, worked fine on Python 3.12),
        # !!! which is undesirable. NamedMovable object will be detected checked for attributes of
        # !!! Movable protocol.
        # if hasattr(protocols, "NamedMovable"):
        #     movable_protocols = (*movable_protocols, protocols.NamedMovable)
        return {
            "is_readable": isinstance(device, protocols.Readable),
            "is_movable": isinstance(device, movable_protocols),
            "is_flyable": isinstance(device, protocols.Flyable),
            "classname": type(device).__name__,
            "module": type(device).__module__,
        }
    def get_device_component_names(device):
        if hasattr(device, "component_names"):
            component_names = device.component_names
            if not isinstance(component_names, Iterable):
                component_names = []
        elif hasattr(device, "children"):
            component_names = [_[0] for _ in device.children()]
        else:
            component_names = []
        return component_names
    def create_device_description(device, device_name, *, depth=0, max_depth=0, is_registered=False):
        description = get_device_params(device)
        comps = get_device_component_names(device)
        components = {}
        is_areadetector = isinstance(device, ADBase)
        expand = is_registered or not is_areadetector or expand_areadetectors
        if expand and (not max_depth or (depth < max_depth - 1)):
            ignore_subdevices = False
            for comp_name in comps:
                try:
                    if hasattr(device, comp_name):
                        c = getattr(device, comp_name)
                        desc = create_device_description(
                            c,
                            device_name + "." + comp_name,
                            depth=depth + 1,
                            max_depth=max_depth,
                            is_registered=is_registered,
                        )
                        components[comp_name] = desc
                except Exception as ex:
                    ignore_subdevices = ignore_all_subdevices_if_one_fails
                    logger.warning(
                        "Device '%s': component '%s' can not be processed: %s", device_name, comp_name, ex
                    )
                if ignore_subdevices:
                    components = {}  # Ignore all components of the subdevice
                    break
            if components:
                description["components"] = components
        return description
    def process_devices(*, max_depth):
        """
        Process devices one by one based on information from the namespace and
        the dictionary of registered devices.
        """
        reg_devices = reg_ns_items.reg_devices
        devices_selected = {}
        for name in set(devices.keys()).union(set(reg_devices.keys())):
            obj, _max_depth, is_registered = None, max_depth, False
            if name in reg_devices:
                obj = reg_devices[name]["obj"]
                _max_depth = reg_devices[name]["depth"]
                if obj:
                    is_registered = True
            elif name in devices:
                obj = devices[name]
            if obj:
                devices_selected[name] = dict(obj=obj, max_depth=_max_depth, is_registered=is_registered)
        device_dict = {}
        for name, p in devices_selected.items():
            device_dict[name] = create_device_description(
                p["obj"], name, max_depth=p["max_depth"], is_registered=p["is_registered"]
            )
        return device_dict
    return process_devices(max_depth=max_depth)
def existing_plans_and_devices_from_nspace(*, nspace, max_depth=0, ignore_invalid_plans=False):
    """
    Generate lists of existing plans and devices from namespace. The namespace
    must be in the form returned by ``load_worker_startup_code()``.
    Parameters
    ----------
    nspace: dict
        Namespace that contains plans and devices
    max_depth: int
        Default maximum depth for device search: 0 - unlimited depth, 1 - include only top level devices,
        2 - include top level devices and subdevices, etc.
    ignore_invalid_plans: bool
        Ignore plans with unsupported signatures. If the argument is ``False`` (default), then
        an exception is raised otherwise a message is printed and the plan is not included in the list.
    Returns
    -------
    existing_plans : dict
        Dictionary of descriptions of existing plans
    existing_devices : dict
        Dictionary of descriptions of existing devices
    plans_in_nspace : dict
        Dictionary of plans in namespace
    devices_in_nspace : dict
        Dictionary of devices in namespace
    """
    logger.debug("Extracting existing plans and devices from the namespace ...")
    plans_in_nspace = plans_from_nspace(nspace)
    devices_in_nspace = devices_from_nspace(nspace)
    existing_devices = _prepare_devices(devices_in_nspace, max_depth=max_depth)
    existing_plans = _prepare_plans(
        plans_in_nspace, existing_devices=existing_devices, ignore_invalid_plans=ignore_invalid_plans
    )
    return existing_plans, existing_devices, plans_in_nspace, devices_in_nspace
def save_existing_plans_and_devices(
    *,
    existing_plans,
    existing_devices,
    file_dir=None,
    file_name=None,
    overwrite=False,
):
    """
    Save dictionaries of existing plans and devices to YAML file. The dictionaries of existing
    plans and devices must be in the form returned by ``existing_plans_and_devices_from_nspace``.
    Parameters
    ----------
    existing_plans : dict
        dictionary of existing plans (key - plan name, value - plan description)
    existing_devices : dict
        dictionary of existing devices (key - device name, value - device description)
    startup_script_path : str or None
        name of the startup script
    file_dir : str or None
        path to the directory where the file is to be created. None - create file in current directory.
    file_name : str
        name of the output YAML file, None - default file name is used
    overwrite : boolean
        overwrite the file if it already exists
    """
    existing_plans_and_devices = {
        "existing_plans": existing_plans,
        "existing_devices": existing_devices,
    }
    file_path = os.path.join(file_dir, file_name)
    if os.path.exists(file_path) and not overwrite:
        raise IOError(f"File '{file_path}' already exists. File overwriting is disabled.")
    with open(file_path, "w") as stream:
        stream.write("# This file is automatically generated. Edit at your own risk.\n")
        yaml.dump(existing_plans_and_devices, stream)
def load_existing_plans_and_devices(path_to_file=None):
    """
    Load the lists of allowed plans and devices from YAML file. Returns empty lists
    if `path_to_file` is None or "" or the file does not exist or corrupt.
    Parameters
    ----------
    path_to_file: str on None
        Full path to .yaml file that contains the lists.
    Returns
    -------
    (dict, dict)
        List of allowed plans and list of allowed devices.
    """
    msg = "List of plans and devices is not loaded."
    if not path_to_file:
        logger.warning("%s File path is not specified.", msg)
        return {}, {}
    if not os.path.isfile(path_to_file):
        logger.warning("%s File '%s' does not exist.", msg, path_to_file)
        return {}, {}
    with open(path_to_file, "r") as stream:
        try:
            existing_plans_and_devices = yaml.load(stream, Loader=yaml.FullLoader)
        except Exception as ex:
            logger.error("%s File '%s' has invalid YAML: %s", msg, path_to_file, ex)
            existing_plans_and_devices = {}
    if not isinstance(existing_plans_and_devices, dict):
        logger.warning("%s The file '%s' has invalid format or empty.", msg, path_to_file)
        return {}, {}
    existing_plans = existing_plans_and_devices.get("existing_plans", {})
    existing_devices = existing_plans_and_devices.get("existing_devices", {})
    return existing_plans, existing_devices
def compare_existing_plans_and_devices(
    *,
    existing_plans,
    existing_devices,
    existing_plans_ref,
    existing_devices_ref,
):
    """
    Compares the lists of existing plans and devices with the reference lists. Returns ``False``
    if the exception is raised while comparing the lists. (Consistent exception handling is
    the only reason we need this function.)
    Parameters
    ----------
    existing_plans : dict
        The updated list (dictionary) of descriptions of the existing plans.
    existing_devices : dict
        The updated list (dictionary) of descriptions of the existing devices.
    existing_plans_ref : dict or None
        The reference list (dictionary) of descriptions of the existing plans.
    existing_devices_ref : dict or None
        The reference list (dictionary) of descriptions of the existing devices.
    Returns
    -------
    boolean
        ``True`` - the lists are equal, ``False`` otherwise.
    """
    try:
        lists_equal = (existing_plans == existing_plans_ref) and (existing_devices == existing_devices_ref)
    except Exception as ex:
        # If the lists (dictionaries) of plans and devices can not be compared, then save the new lists.
        #   This issue should be investigated if it is repeated regularly.
        logger.warning("Failed to compare existing plans or existing devices: %s", str(ex))
        lists_equal = False
    return lists_equal
def update_existing_plans_and_devices(
    *,
    path_to_file,
    existing_plans,
    existing_devices,
    existing_plans_ref=None,
    existing_devices_ref=None,
    always_save=False,
):
    """
    Update the lists of existing plans and devices stored in the file ``path_to_file`` on disk if
    the lists are different from the reference lists. The reference lists can be passed to the function
    or loaded from the file if ``existing_plans_ref`` or ``existing_devices_ref`` are ``None``.
    If ``always_save`` is False, then the descriptions are updated only if the lists have changed,
    otherwise the lists are always saved. The function will never modified files in the directory
    with startup scripts, which is part of the repository or distributed package.
    This function never raises exceptions by design, since successful updating of stored lists of plans
    and devices is not considered critical for executing plans. If the function regularly generates
    warnings or error messages, the issue must be investigated.
    .. note::
        Lists of plans are devices are represented as Python dictionaries.
    Parameters
    ----------
    path_to_file : str
        Path to file that contains the lists of existing plans and devices
    existing_plans : dict
        The updated list (dictionary) of descriptions of the existing plans.
    existing_devices : dict
        The updated list (dictionary) of descriptions of the existing devices.
    existing_plans_ref : dict or None
        The reference list (dictionary) of descriptions of the existing plans.
    existing_devices_ref : dict or None
        The reference list (dictionary) of descriptions of the existing devices.
    always_save : boolean
        If ``False`` then save lists of plans and devices only if it is different
        from reference lists. Always save the lists to disk if ``True``.
    Returns
    -------
    boolean
        ``True`` if the lists of existing plans and devices is different from the reference
        lists or if ``always_save`` is ``True``.
    """
    if not always_save:
        if (existing_plans_ref is None) or (existing_devices_ref is None):
            ep, ed = load_existing_plans_and_devices(path_to_file)
        if existing_plans_ref is not None:
            ep = existing_plans_ref
        if existing_devices_ref is not None:
            ed = existing_devices_ref
        changes_exist = not compare_existing_plans_and_devices(
            existing_plans=existing_plans,
            existing_devices=existing_devices,
            existing_plans_ref=ep,
            existing_devices_ref=ed,
        )
    else:
        changes_exist = True
    if changes_exist:
        logger.info("Updating the list of existing plans and devices in the file '%s'", path_to_file)
        def normalize_path(fpath):
            """
            Normalize path to make it easier to compare to other paths
            """
            return os.path.realpath(os.path.abspath(os.path.expanduser(fpath)))
        default_startup_dir = normalize_path(get_default_startup_dir())
        file_path = normalize_path(path_to_file)
        try:
            path_is_default = os.path.commonpath([default_startup_dir, file_path]) == default_startup_dir
        except Exception:
            path_is_default = False
        if path_is_default:
            logger.warning(
                "The file '%s' is located in the directory, which is part of the package and can not be updated",
                path_to_file,
            )
        else:
            # Saving the updated plans and devices is not critical for real-time operation of the queue server.
            #   Plans can still be executed, so the error can be ignored, but it should be investigated, because
            #   some functionality may not be working properly.
            try:
                save_existing_plans_and_devices(
                    existing_plans=existing_plans,
                    existing_devices=existing_devices,
                    file_dir=os.path.dirname(path_to_file),
                    file_name=os.path.basename(path_to_file),
                    overwrite=True,
                )
            except Exception as ex:
                logger.error(
                    "Failed to save lists of existing plans and device to file '%s': %s", path_to_file, ex
                )
    return changes_exist
_user_group_permission_schema = {
    "type": "object",
    "additionalProperties": False,
    "required": ["user_groups"],
    "properties": {
        "user_groups": {
            "type": "object",
            "additionalProperties": {
                "type": "object",
                "additionalProperties": False,
                "properties": {
                    "allowed_plans": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                    "forbidden_plans": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                    "allowed_devices": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                    "forbidden_devices": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                    "allowed_functions": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                    "forbidden_functions": {
                        "type": "array",
                        "items": {"type": ["string", "null"]},
                    },
                },
            },
        },
    },
}
def _validate_user_group_permissions_schema(user_group_permissions):
    """
    Validate user group permissions schema. Raises exception if validation fails.
    Parameters
    ----------
    user_group_permissions: dict
        A dictionary with user group permissions.
    """
    jsonschema.validate(instance=user_group_permissions, schema=_user_group_permission_schema)
def validate_user_group_permissions(user_group_permissions):
    """
    Validate the dictionary with user group permissions. Exception is raised errors are detected.
    Parameters
    ----------
    user_group_permissions: dict
        The dictionary with user group permissions.
    """
    if not isinstance(user_group_permissions, dict):
        raise TypeError(f"User group permissions: Invalid type '{type(user_group_permissions)}', must be 'dict'")
    _validate_user_group_permissions_schema(user_group_permissions)
    if "root" not in user_group_permissions["user_groups"]:
        raise KeyError("User group permissions: Missing required user group: 'root'")
def load_user_group_permissions(path_to_file=None):
    """
    Load the data on allowed plans and devices for user groups. User group 'root'
    is required. Exception is raised in user group 'root' is missing.
    Parameters
    ----------
    path_to_file: str
        Full path to YAML file that contains user data permissions.
    Returns
    -------
    dict
        Data structure with user permissions. Returns ``{}`` if path is an empty string
        or None.
    Raises
    ------
    IOError
        Error while reading the YAML file.
    """
    if not path_to_file:
        return {}
    try:
        if not os.path.isfile(path_to_file):
            raise IOError(f"File '{path_to_file}' does not exist.")
        with open(path_to_file, "r") as stream:
            user_group_permissions = yaml.safe_load(stream)
        validate_user_group_permissions(user_group_permissions)
        if "root" not in user_group_permissions["user_groups"]:
            raise Exception("Missing required user group: 'root'")
    except Exception as ex:
        msg = f"Error while loading user group permissions from file '{path_to_file}': {str(ex)}"
        raise IOError(msg)
    return user_group_permissions
def _check_if_plan_or_func_allowed(item_name, allow_patterns, disallow_patterns):
    """
    Check if an item with ``item_name`` is allowed based on ``allow_patterns``
    and ``disallow_patterns``. Patterns may be explicitly listed names and regular expressions.
    ``None`` may appear only as a first element of the pattern list.
    The function should be used only with function and plan names.
    Parameters
    ----------
    item_name: str
        Name of the item.
    allow_patterns: list(str)
        Selected item should match at least one of the re patterns. If the value is ``[None]``
        then all items are selected. If ``[]``, then no items are selected.
    disallow_patterns: list(str)
        Items are deselected based on re patterns in the list: if an item matches at least one
        of the patterns, it is deselected. If the value is ``[None]`` or ``[]`` then no items
        are deselected.
    Returns
    -------
    boolean
        Indicates if the item is permitted based on ``allow_patterns`` and ``disallow_patterns``.
    """
    item_is_allowed = False
    def prepare_pattern(pattern):
        if not isinstance(pattern, str):
            raise TypeError("Pattern is not a string: {pattern!r} ({allow_patterns})")
        # Prepare patterns
        components, uses_re, _ = _split_name_pattern(pattern)
        if len(components) != 1:
            raise ValueError("Name pattern for functions and plans must contain one component: {components}")
        component = components[0][0]
        if not uses_re:
            if not re.search("[_a-zA-Z][_0-9a-zA-Z]*", component):
                assert ValueError(
                    f"Plan or function name pattern {component!r} contains invalid characters. "
                    "The pattern could be a valid regular expression, "
                    "but it is not labeled with ':' (e.g. ':^count$')"
                )
        return component, uses_re
    if allow_patterns:
        if allow_patterns[0] is None:
            item_is_allowed = True
        else:
            for pattern in allow_patterns:
                component, uses_re = prepare_pattern(pattern)
                if uses_re:
                    if re.search(component, item_name):
                        item_is_allowed = True
                        break
                else:
                    if item_name == component:
                        item_is_allowed = True
                        break
    if item_is_allowed:
        if disallow_patterns and (disallow_patterns[0] is not None):
            for pattern in disallow_patterns:
                component, uses_re = prepare_pattern(pattern)
                if uses_re:
                    if re.search(component, item_name):
                        item_is_allowed = False
                        break
                else:
                    if item_name == component:
                        item_is_allowed = False
                        break
    return item_is_allowed
def check_if_function_allowed(function_name, *, group_name, user_group_permissions=None):
    """
    Check if the function with name ``function_name`` is allowed for users of ``group_name`` group.
    The function first checks if the function name passes root permissions and then permissions
    for the specific group.
    Parameters
    ----------
    function_name: str
        Function name to check
    group_name: str
        User group name. Permissions must be defined for this group.
    user_group_permissions: dict or None
        Reference to a dictionary, which contains the defined user group permissions.
    Returns
    -------
    boolean
        Indicates if the user that belong to ``user_group`` are allowed to run the function.
    Raises
    ------
    KeyError
        ``user_group_permissions`` do not contain ``user_groups`` key or permissions for
        ``group_name`` group are not defined.
    """
    if not user_group_permissions:
        return False
    if "user_groups" not in user_group_permissions:
        raise KeyError("User group permissions dictionary does not contain 'user_groups' key")
    user_groups = user_group_permissions["user_groups"]
    if "root" not in user_groups:
        raise KeyError("No permissions are defined for user group 'root'")
    if group_name not in user_groups:
        raise KeyError(f"No permissions are defined for user group {group_name!r}")
    root_allow_patterns = user_groups["root"].get("allowed_functions", [])
    root_disallow_patterns = user_groups["root"].get("forbidden_functions", [])
    group_allow_patterns = user_groups[group_name].get("allowed_functions", [])
    group_disallow_patterns = user_groups[group_name].get("forbidden_functions", [])
    return _check_if_plan_or_func_allowed(
        function_name,
        root_allow_patterns,
        root_disallow_patterns,
    ) and _check_if_plan_or_func_allowed(
        function_name,
        group_allow_patterns,
        group_disallow_patterns,
    )
def _select_allowed_plans(item_dict, allow_patterns, disallow_patterns):
    """
    Creates the dictionary of items selected from `item_dict` with names names that
    satisfy patterns in `allow_patterns` and not satisfy patterns in `disallow_patterns`.
    The function does not modify the dictionary, but creates a new dictionary with
    selected items, where item values are deep copies of the values of the original
    dictionary.
    Parameters
    ----------
    item_dict: dict
        Dictionary of items.
    allow_patterns: list(str)
        Selected item should match at least one of the re patterns. If the value is ``[None]``
        then all items are selected. If ``[]``, then no items are selected.
    disallow_patterns: list(str)
        Items are deselected based on re patterns in the list: if an item matches at least one
        of the patterns, it is deselected. If the value is ``[None]`` or ``[]`` then no items
        are deselected.
    Returns
    -------
    dict
        Dictionary of the selected items.
    """
    items_selected = {}
    for item_name in item_dict:
        if _check_if_plan_or_func_allowed(item_name, allow_patterns, disallow_patterns):
            items_selected[item_name] = copy.deepcopy(item_dict[item_name])
    return items_selected
def load_allowed_plans_and_devices(
    *,
    path_existing_plans_and_devices=None,
    path_user_group_permissions=None,
    existing_plans=None,
    existing_devices=None,
    user_group_permissions=None,
):
    """
    Generate dictionaries of allowed plans and devices for each user group. If there are no user
    groups defined (path is None), then output dictionaries will have one user group 'root'
    with all existing plans and devices allowed. The function is using the lists (dict) of
    existing plans and devices passed as parameters. If the lists are not passed (or ``None``),
    then they are loaded from the file ``path_existing_plans_and_devices``.
    If the lists of plans and devices do not exist (path_existing_plans_and_devices is ``None``
    or points to non-existing file), then empty dictionaries are returned.
    Parameters
    ----------
    path_existing_plans_and_devices: str or None
        Full path to YAML file, which contains dictionaries of existing plans and devices.
        Raises an exception if the file does not exist. If ``None`` or the file contains
        no plans and/or devices, then empty dictionaries of the allowed plans and devices
        are returned for all groups.
    path_user_group_permissions: str or None
        Full path to YAML file, which contains information on user group permissions.
        Exception is raised if the file does not exist. The path is ignored if
        ``user_group_permissions`` is set. If both ``path_user_group_permissions`` is ``None``
        and ``user_group_permissions`` is ``None`` or ``{}``, then dictionaries of allowed
        plans and devices will contain one group 'root' with all existing devices and plans
        set as allowed.
    existing_plans: dict or None
        List (dict) of the existing plans. If ``None``, then the list is loaded from
        the file ``path_existing_plans_and_devices``.
    existing_devices: dict or None
        List (dict) of the existing devices. If ``None``, then the list is loaded from
        the file ``path_existing_plans_and_devices``.
    user_group_permissions: str or None
        Dictionary with user group permissions. If ``None``, then permissions are loaded
        from the file ``path_user_group_permissions``. If ``{}``, then dictionaries of allowed
        plans and devices will contain one group 'root' with all existing devices and plans
        set as allowed.
    Returns
    -------
    dict, dict
        Dictionaries of allowed plans and devices. Dictionary keys are names of the user groups.
        Dictionaries are ``{}`` if no data on existing plans and devices is provided.
        Dictionaries contain one group (``root``) if no user perission data is provided.
    """
    allowed_plans, allowed_devices = {}, {}
    try:
        if user_group_permissions is None:
            user_group_permissions = load_user_group_permissions(path_user_group_permissions)
        # Data from the file is loaded only if ``existing_plans`` or ``existing_devices`` is ``None``.
        if (existing_plans is None) or (existing_devices is None):
            ep, ed = load_existing_plans_and_devices(path_existing_plans_and_devices)
        if existing_plans is None:
            existing_plans = ep
        if existing_devices is None:
            existing_devices = ed
        # Detect some possible errors
        if not user_group_permissions:
            raise RuntimeError("No user permissions are specified")
        if not isinstance(user_group_permissions, dict):
            raise RuntimeError(
                f"'user_group_permissions' has type {type(user_group_permissions)} instead of 'dict'"
            )
        if "user_groups" not in user_group_permissions:
            raise RuntimeError("No user groups are defined: 'user_groups' keys is not in 'user_group_permissions'")
        user_groups = list(user_group_permissions["user_groups"].keys())
        # First filter the plans and devices based on the permissions for 'root' user group.
        #   The 'root' user group permissions should be used to exclude all 'junk', i.e.
        #   unused/outdated/not functioning plans and devices collected from namespace, from
        #   the lists available to ALL the users.
        plans_root, devices_root = existing_plans, existing_devices
        if "root" in user_groups:
            group_permissions_root = user_group_permissions["user_groups"]["root"]
            if existing_devices:
                devices_root = _filter_device_tree(
                    existing_devices,
                    group_permissions_root.get("allowed_devices", []),
                    group_permissions_root.get("forbidden_devices", []),
                )
            if existing_plans:
                plans_root = _select_allowed_plans(
                    existing_plans,
                    group_permissions_root.get("allowed_plans", []),
                    group_permissions_root.get("forbidden_plans", []),
                )
        # Now create lists of allowed devices and plans based on the lists for the 'root' user group
        for group in user_groups:
            selected_devices, selected_plans = {}, {}
            if group == "root":
                selected_devices = devices_root
                selected_plans = plans_root
            else:
                group_permissions = user_group_permissions["user_groups"][group]
                if existing_devices:
                    selected_devices = _filter_device_tree(
                        devices_root,
                        group_permissions.get("allowed_devices", []),
                        group_permissions.get("forbidden_devices", []),
                    )
                if existing_plans:
                    selected_plans = _select_allowed_plans(
                        plans_root,
                        group_permissions.get("allowed_plans", []),
                        group_permissions.get("forbidden_plans", []),
                    )
            selected_plans = _filter_allowed_plans(allowed_plans=selected_plans, allowed_devices=selected_devices)
            allowed_devices[group] = selected_devices
            allowed_plans[group] = selected_plans
    except Exception as ex:
        logger.exception("Error occurred while generating the list of allowed plans and devices: %s", ex)
        # The 'default' lists in case error occurred while generating lists
        allowed_plans["root"] = existing_plans
        allowed_devices["root"] = existing_devices
    return allowed_plans, allowed_devices
def load_profile_collection_from_ipython(path=None):
    """
    Load profile collection from IPython. Useful utility function that may be used to
    test if a profile collection can be loaded from IPython.
    Manually run this function from IPython:
    .. code-block:: python
        from bluesky_queueserver.manager.profile_ops import load_profile_collection_from_ipython
        load_profile_collection_from_ipython()
    """
    ip = get_ipython()  # noqa F821
    for f in sorted(glob.glob("*.py") + glob.glob("*.ipy")):
        print(f"Executing '{f}' in TravisCI")
        ip.parent._exec_file(f)
    print("Profile collection was loaded successfully.")
[docs]
def format_text_descriptions(item_parameters, *, use_html=False):
    """
    Format parameter descriptions for a plan from the list of allowed plans.
    Returns description of the plan and each parameter represented as formatted strings
    containing plan name/description and parameter name/type/default/min/max/step/description.
    The text is prepared for presentation in user interfaces. The descriptions are
    represented as a dictionary:
    .. code-block:: python
        {
            "description": "Multiline formatted text description of the plan",
            "parameters": {
                "param_name1": "Multiline formatted description",
                "param_name2": "Multiline formatted description",
                "param_name3": "Multiline formatted description",
            }
        }
    Parameters
    ----------
    item_parameters : dict
        A dictionary of item parameters, e.g. an element from the list of existing or allowed plans.
    use_html : boolean
        Select if the formatted text should be returned as plain text (default) or HTML.
    Returns
    -------
    dict
        The dictionary that contains formatted descriptions for the plan and its parameters.
    """
    if not item_parameters:
        return {}
    start_bold = "<b>" if use_html else ""
    stop_bold = "</b>" if use_html else ""
    start_it = "<i>" if use_html else ""
    stop_it = "</i>" if use_html else ""
    new_line = "<br>" if use_html else "\n"
    not_available = "Description is not available"
    descriptions = {}
    item_name = item_parameters.get("name", "")
    item_description = str(item_parameters.get("description", ""))
    item_description = item_description.replace("\n", new_line)
    s = f"{start_it}Name:{stop_it} {start_bold}{item_name}{stop_bold}"
    if item_description:
        s += f"{new_line}{item_description}"
    descriptions["description"] = s if s else not_available
    descriptions["parameters"] = {}
    for p in item_parameters.get("parameters", []):
        p_name = p.get("name", None) or "-"
        p_type = "-"
        p_custom_types = []
        annotation = p.get("annotation", None)
        if annotation:
            p_type = str(annotation.get("type", "")) or "-"
            for t in ("devices", "plans", "enums"):
                if t in annotation:
                    for ct in annotation[t]:
                        p_custom_types.append((ct, tuple(annotation[t][ct])))
        p_default = p.get("default", None) or "-"
        p_min = p.get("min", None)
        p_max = p.get("max", None)
        p_step = p.get("step", None)
        p_description = p.get("description", "")
        p_description = p_description.replace("\n", new_line)
        desc = (
            f"{start_it}Name:{stop_it} {start_bold}{p_name}{stop_bold}{new_line}"
            f"{start_it}Type:{stop_it} {start_bold}{p_type}{stop_bold}{new_line}"
        )
        for ct, ct_items in p_custom_types:
            desc += f"{start_bold}{ct}:{stop_bold} {ct_items}{new_line}"
        desc += f"{start_it}Default:{stop_it} {start_bold}{p_default}{stop_bold}"
        if (p_min is not None) or (p_max is not None) or (p_step is not None):
            desc += f"{new_line}"
            insert_space = False
            if p_min is not None:
                desc += f"{start_it}Min:{stop_it} {start_bold}{p_min}{stop_bold}"
                insert_space = True
            if p_max is not None:
                if insert_space:
                    desc += " "
                desc += f"{start_it}Max:{stop_it} {start_bold}{p_max}{stop_bold}"
                insert_space = True
            if p_step is not None:
                if insert_space:
                    desc += " "
                desc += f"{start_it}Step:{stop_it} {start_bold}{p_step}{stop_bold}"
        if p_description:
            desc += f"{new_line}{p_description}"
        descriptions["parameters"][p_name] = desc if desc else not_available
    return descriptions