Source code for bluesky_adaptive.agents.base

import copy
import inspect
import sys
import threading
import time as ttime
import uuid
from abc import ABC, ABCMeta, abstractmethod
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from logging import getLogger
from typing import Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, TypedDict, Union

import msgpack
import numpy as np
import tiled
import tiled.client.container
from bluesky_kafka import Publisher, RemoteDispatcher
from bluesky_queueserver_api import BPlan
from bluesky_queueserver_api.api_threads import API_Threads_Mixin
from event_model import compose_run
from numpy.typing import ArrayLike
from xkcdpass import xkcd_password as xp

from ..adjudicators.msg import DEFAULT_NAME as ADJUDICATOR_STREAM_NAME
from ..adjudicators.msg import AdjudicatorMsg, Suggestion
from ..server import register_variable, start_task
from ..typing import BlueskyRunLike

logger = getLogger("bluesky_adaptive.agents")
PASSWORD_LIST = xp.generate_wordlist(wordfile=xp.locate_wordfile(), min_length=3, max_length=6)


class AgentConsumer(RemoteDispatcher):
    def __init__(
        self,
        *,
        topics,
        bootstrap_servers,
        group_id,
        agent=None,
        consumer_config=None,
        polling_duration=0.05,
        deserializer=msgpack.loads,
    ):
        """Dispatch documents from Kafka to bluesky callbacks, modified for agent usage.
        This allows subscribing the dispatcher to an on-stop protocol for agents to be told about
        new Bluesky runs. It also provides an interface to trigger changes to the agent using the same
        Kafka topics.

        Parameters
        ----------
        topics : list
            List of topics as strings such as ["topic-1", "topic-2"]
        bootstrap_servers : str
            Comma-delimited list of Kafka server addresses as a string such as ``'127.0.0.1:9092'``
        group_id : str
            Required string identifier for Kafka Consumer group
        agent : Agent
            Instance of the agent to send directives to. Must be set to send directives.
        consumer_config : dict
            Override default configuration or specify additional configuration
            options to confluent_kafka.Consumer.
        polling_duration : float
            Time in seconds to wait for a message before running function work_while_waiting.
            Default is 0.05.
        deserializer : function, optional
            optional function to deserialize data. Default is msgpack.loads.
        """
        super().__init__(topics, bootstrap_servers, group_id, consumer_config, polling_duration, deserializer)
        self._agent = agent

    def _agent_action(self, topic, doc):
        """Exposes agent methods via the kafka topic.
        This allows bluesky plans, or adjudicators to interface with agent hyperparameters or settings.

        Parameters
        ----------
        topic : str
            the Kafka topic of the message containing name and doc
        doc : dict
            agent document expecting
            {
            'action': 'method_name',
            'args': [arg1,arg2,...],
            'kwargs': {kwarg1:val1, kwarg2:val2}
            }

        Returns
        -------
        continue_polling : bool
        """
        action = doc["action"]
        args = doc["args"]
        kwargs = doc["kwargs"]
        try:
            getattr(self._agent, action)(*args, **kwargs)
        except AttributeError as e:
            logger.error(
                f"Unavailable action sent to agent {self._agent.instance_name} on topic: {topic}\n" f"{e}"
            )
        except TypeError as e:
            logger.error(
                f"Type error for {action} sent to agent {self._agent.instance_name} on topic: {topic}\n"
                f"Are you sure your args and kwargs were appropriate?\n"
                f"Args received: {args}\n"
                f"Kwargs received: {kwargs}\n"
                f"Expected signature: {inspect.signature(getattr(self.agent, action))}\n"
                f"{e}"
            )
        return True

    def process_document(self, consumer, topic, name, doc):
        """
        Processes bluesky documents.
        Optionally
        Sends bluesky document to RemoteDispatcher.process(name, doc)
        If this method returns False the BlueskyConsumer will break out of the
        polling loop.

        Parameters
        ----------
        topic : str
            the Kafka topic of the message containing name and doc
        name : str
            bluesky document name: `start`, `descriptor`, `event`, etc.
        doc : dict
            bluesky document

        Returns
        -------
        continue_polling : bool
            return False to break out of the polling loop, return True to continue polling
        """
        if name == self._agent.instance_name:
            return self._agent_action(topic, doc)
        else:
            return super().process_document(consumer, topic, name, doc)

    def set_agent(self, agent):
        self._agent = agent


class DataKeys(TypedDict):
    dtype: str
    dtype_str: str
    dtype_descr: list
    shape: list


def infer_data_keys(doc: dict) -> DataKeys:
    data_keys = dict()
    _bad_iterables = (str, bytes, dict)
    _type_map = {
        "number": (float, np.floating, complex),
        "array": (np.ndarray, list, tuple),
        "string": (str,),
        "integer": (int, np.integer),
    }
    for key, val in doc.items():
        if isinstance(val, Iterable) and not isinstance(val, _bad_iterables):
            dtype = "array"
        else:
            for json_type, py_types in _type_map.items():
                if isinstance(val, py_types):
                    dtype = json_type
                    break
            else:
                raise TypeError()
        arr_val = np.asanyarray(val)
        arr_dtype = arr_val.dtype
        data_keys[key] = dict(
            dtype=dtype,
            dtype_str=arr_dtype.str,
            dtype_descr=arr_dtype.descr,
            shape=list(arr_val.shape),
            source="agent",
        )
    return data_keys


class BackwardCompatMeta(ABCMeta):
    def __new__(cls, name, bases, dct):
        if "tell" in dct and "ingest" not in dct:
            dct["ingest"] = dct["tell"]
            logger.warning(
                "Agent subclass has a tell method. "
                "This is deprecated and will be removed prior to a major release. "
                "Please use ingest instead. Setting `ingest` to implemented `tell`."
            )
        if "ask" in dct and "suggest" not in dct:
            dct["suggest"] = dct["ask"]
            logger.warning(
                "Agent subclass has an ask method. "
                "This is deprecated and will be removed prior to a major release. "
                "Please use suggest instead. Setting suggest to implemeneted ask."
            )
        return super().__new__(cls, name, bases, dct)


[docs] class Agent(ABC, metaclass=BackwardCompatMeta): """Abstract base class for a single plan agent. These agents should consume data, decide where to measure next, and execute a single type of plan (something akin to move and count). Alternatively, these agents can be used for soley reporting. Base agent sets up a kafka subscription to listen to new stop documents, a catalog to read for experiments, a catalog to write agent status to, a kafka publisher to write agent documents to, and a manager API for the queue-server. Each time a stop document is read, the respective BlueskyRun is unpacked by the ``unpack_run`` method into an independent and dependent variable, and piped to the agent through the ``ingest`` method. Children of Agent should implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run Agent specific: - ingest - suggest - report (optional) - name (optional) Parameters ---------- kafka_consumer : AgentConsumer Consumer (subscriber) of Kafka Bluesky documents. It should be subcribed to the sources of Bluesky stop documents that will trigger ``ingest``. AgentConsumer is a child class of bluesky_kafka.RemoteDispatcher that enables kafka messages to trigger agent directives. kafka_producer : Optional[Publisher] Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator. tiled_data_node : tiled.client.container.Container Tiled node to serve as source of data (BlueskyRuns) for the agent. tiled_agent_node : tiled.client.container.Container Tiled node to serve as storage for the agent documents. qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin Object to manage communication with Queue Server agent_run_suffix : Optional[str], optional Agent name suffix for the instance, by default generated using 2 hyphen separated words from xkcdpass. metadata : Optional[dict], optional Optional extra metadata to add to agent start document, by default {} ssuggest_on_ingest : bool, optional Whether to suggest new points every time an agent ingests new data. To create an agent that only suggests new points periodically or on another trigger, ``suggest_on_ingest`` should be set to False. By default True Can be adjusted using ``enable_continuous_suggesting`` and ``disable_continuous_suggesting``. direct_to_queue : Optional[bool], optional Whether the agent suggestions will be placed directly on the queue. If false, the suggestions will be sent to a Kafka topic for an Adjudicator to process. By default True Can be adjusted using ``enable_direct_to_queue`` and ``disable_direct_to_queue``. report_on_ingest : bool, optional Whether to create a report every time an agent ingests new data. By default False. Can be adjusted using ``enable_continuous_reporting`` and ``disable_continuous_reporting``. default_report_kwargs : Optional[dict], optional Default kwargs for calling the ``report`` method, by default None queue_add_position : Optional[Union[int, Literal["front", "back"]]], optional Starting postion to add to the queue if adding directly to the queue, by default "back". endstation_key : Optional[str] Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit operations. For example, this could be a beamline three letter acronym or other distinct key. """ def __init__( self, *, kafka_consumer: AgentConsumer, tiled_data_node: tiled.client.container.Container, tiled_agent_node: tiled.client.container.Container, qserver: API_Threads_Mixin, kafka_producer: Optional[Publisher], agent_run_suffix: Optional[str] = None, metadata: Optional[dict] = None, suggest_on_ingest: Optional[bool] = True, direct_to_queue: Optional[bool] = True, report_on_ingest: Optional[bool] = False, default_report_kwargs: Optional[dict] = None, queue_add_position: Optional[Union[int, Literal["front", "back"]]] = None, endstation_key: Optional[str] = "", ): logger.debug("Initializing agent.") self.kafka_consumer = kafka_consumer self.kafka_consumer.set_agent(self) self.kafka_consumer.subscribe(self._on_stop_router) self.kafka_producer = kafka_producer logger.debug("Kafka set up successfully.") self.exp_catalog = tiled_data_node logger.info(f"Reading data from catalog: {self.exp_catalog}") self.agent_catalog = tiled_agent_node logger.info(f"Writing data to catalog: {self.agent_catalog}") self.metadata = metadata or {} self.instance_name = ( f"{self.name}-{agent_run_suffix}" if agent_run_suffix else f"{self.name}-{xp.generate_xkcdpassword(PASSWORD_LIST, numwords=2, delimiter='-')}" ) self.metadata["agent_name"] = self.instance_name self._suggest_on_ingest = suggest_on_ingest self._report_on_ingest = report_on_ingest self.default_report_kwargs = {} if default_report_kwargs is None else default_report_kwargs self._compose_run_bundle = None self._compose_descriptor_bundles = dict() self.re_manager = qserver self.endstation_key = endstation_key self._queue_add_position = "back" if queue_add_position is None else queue_add_position self._direct_to_queue = direct_to_queue self.default_plan_md = dict(agent_name=self.instance_name, agent_class=str(type(self))) self.known_uid_cache = list() try: self.server_registrations() except RuntimeError as e: logger.warning(f"Agent server unable to make registrations. Continuing regardless of\n {e}") self._kafka_thread = None
[docs] @abstractmethod def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: """Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan. Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters. Parameters ---------- point : ArrayLike Next point to measure using a given plan Returns ------- plan_name : str plan_args : List List of arguments to pass to plan from a point to measure. plan_kwargs : dict Dictionary of keyword arguments to pass the plan, from a point to measure. """ ...
@staticmethod @abstractmethod def unpack_run(run: BlueskyRunLike) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]: """ Consume a Bluesky run from tiled and emit the relevant x and y for the agent. Parameters ---------- run : BlueskyRun Returns ------- independent_var : The independent variable of the measurement dependent_var : The measured data, processed for relevance """ ...
[docs] @abstractmethod def ingest( self, independent_variable: ArrayLike, dependent_variable: Optional[ArrayLike] = None ) -> Dict[str, ArrayLike]: """ Agent ingest of new data. This method replaces the legacy ``tell`` in the ``ask-tell`` pattern. Parameters ---------- independent_variable : ArrayLike Independent variable for data observed dependent_variable : Optional[ArrayLike], optional Dependent variable for data observed, by default None. In passive or unsupervised settings, the data may be cast as soley an independent variable. Returns ------- dict: Dictionary to be unpacked or added to a document. This registers what the agent knows about and when. """ ...
[docs] def suggest(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: """ Suggest a new batch of points to measure. Parameters ---------- batch_size : int Number of new points to measure Returns ------- docs : Sequence[dict] Documents of key metadata from the suggestion approach for each point in next_points. Must be length of batch size. next_points : Sequence Sequence of independent variables of length batch size """ raise NotImplementedError
[docs] def report(self, **kwargs) -> Dict[str, ArrayLike]: """ Create a report given the data observed by the agent. This could be potentially implemented in the base class to write document stream. Additional functionality for converting the report dict into an image or formatted report is the duty of the child class. """ raise NotImplementedError
def ingest_many( self, independents: Sequence[ArrayLike], dependents: Optional[Sequence[ArrayLike]] ) -> Sequence[Dict[str, List]]: """ Update the agent with some new data. It is likely that there is a more efficient approach to handling multiple observations for an agent. The default behavior is to iterate over all observations and call the ``ingest`` method. Parameters ---------- independents : Sequence Array of independent variables for observations dependents : Sequence Array of dependent variables for observations Returns ------- list_of_dict """ ingest_emits = [] for x, y in zip(independents, dependents): ingest_emits.append(self.ingest(x, y)) return ingest_emits @property def queue_add_position(self) -> Union[int, Literal["front", "back"]]: return self._queue_add_position @queue_add_position.setter def queue_add_position(self, position: Union[int, Literal["front", "back"]]): self._queue_add_position = position def update_priority(self, position: Union[int, Literal["front", "back"]]): """Convenience method to update the priority of a direct to queue agent Parameters ---------- position : Union[int, Literal["front", "back"]] Position in priority for queue. """ self.queue_add_position = position @property def suggest_on_ingest(self) -> bool: return self._suggest_on_ingest @suggest_on_ingest.setter def suggest_on_ingest(self, flag: bool): self._suggest_on_ingest = flag @property def report_on_ingest(self) -> bool: return self._report_on_ingest @report_on_ingest.setter def report_on_ingest(self, flag: bool): self._report_on_ingest = flag def enable_continuous_reporting(self): """Enable agent to report each time it receives data.""" self.report_on_ingest = True def disable_continuous_reporting(self): """Disable agent to report each time it receives data.""" self.report_on_ingest = False def enable_continuous_suggesting(self): """Enable agent to suggest new points to the queue each time it receives data.""" self.suggest_on_ingest = True def disable_continuous_suggesting(self): """Disable agent to suggest new points to the queue each time it receives data.""" self.suggest_on_ingest = False def enable_direct_to_queue(self): self._direct_to_queue = True def disable_direct_to_queue(self): self._direct_to_queue = False @property def name(self) -> str: """Short string name""" return "agent" @classmethod def build_from_argparse(cls, parser: ArgumentParser, **kwargs): args = parser.parse_args() _kwargs = vars(args) _kwargs.update(kwargs) return cls.__init__(**_kwargs) @classmethod def constructor_argparser(cls) -> ArgumentParser: """Convenience method to put all arguments into a parser""" parser = ArgumentParser(description=cls.__doc__, formatter_class=RawDescriptionHelpFormatter) parser.add_argument("--kafka-group-id", required=True) parser.add_argument("--kafka-bootstrap-servers", required=True) parser.add_argument("--kafka-consumer-config", required=True) parser.add_argument("--kafka-producer-config", required=True) parser.add_argument("--publisher-topic", required=True) parser.add_argument("--subscription-topics", required=True) parser.add_argument("--data-profile-name", required=True) parser.add_argument("--agent-profile-name", required=True) parser.add_argument("--qserver-host", required=True) parser.add_argument("--qserver-api-key", required=True) parser.add_argument("--metadata") return parser def _write_event(self, stream, doc, uid=None): """Add event to builder as event page, and publish to catalog""" if not doc: logger.info(f"No doc presented to write_event for stream {stream}") return if stream not in self._compose_descriptor_bundles: data_keys = infer_data_keys(doc) self._compose_descriptor_bundles[stream] = self._compose_run_bundle.compose_descriptor( name=stream, data_keys=data_keys ) self.agent_catalog.v1.insert("descriptor", self._compose_descriptor_bundles[stream].descriptor_doc) t = ttime.time() event_doc = self._compose_descriptor_bundles[stream].compose_event( data=doc, timestamps={k: t for k in doc}, uid=uid ) self.agent_catalog.v1.insert("event", event_doc) return event_doc["uid"] def _add_to_queue( self, next_points, uid, *, re_manager=None, position: Optional[Union[int, Literal["front", "back"]]] = None, plan_factory: Optional[Callable] = None, ): """ Adds a single set of points to the queue as bluesky plans Parameters ---------- next_points : Iterable New points to measure uid : str re_manager : Optional[bluesky_queueserver_api.api_threads.API_Threads_Mixin] Defaults to self.re_manager position : Optional[Union[int, Literal['front', 'back']]] Defaults to self.queue_add_position plan_factory : Optional[Callable] Function to generate plans from points. Defaults to self.measurement_plan. Callable should return a tuple of (plan_name, args, kwargs) Returns ------- """ for point in next_points: plan_factory = plan_factory or self.measurement_plan plan_name, args, kwargs = self.measurement_plan(point) kwargs.setdefault("md", {}) kwargs["md"].update(self.default_plan_md) kwargs["md"]["agent_suggestion_uid"] = uid plan = BPlan( plan_name, *args, **kwargs, ) if re_manager is None: re_manager = self.re_manager r = re_manager.item_add(plan, pos=self.queue_add_position if position is None else position) logger.debug(f"Sent http-server request for point {point}\n." f"Received reponse: {r}") return def _check_queue_and_start(self): """ If the queue runs out of plans, it will stop. That is, adding a plan to an empty queue will not run the plan. This will not be an issue when there are many agents adding plans to a queue. Giving agents the autonomy to start the queue is a risk that will be mitigated by only allowing the beamline scientists to open and close the environment. A queue cannot be started in a closed environment. """ status = self.re_manager.status(reload=True) if ( status["items_in_queue"] == 1 and status["worker_environment_exists"] is True and status["manager_state"] == "idle" ): self.re_manager.queue_start() logger.info("Agent is starting an idle queue with exactly 1 item.") def _suggest_and_write_events( self, batch_size: int, suggest_method: Optional[Callable] = None, stream_name: Optional[str] = "suggest" ): """Private suggest method for consistency across calls and changes to docs streams. Parameters ---------- batch_size : int Size of batch passed to suggest method suggest_method : Optional[Callable] self.suggest, or self.subject_suggest, or some target suggest function. Defaults to self.suggest stream_name : Optional[str] Name for stream corresponding to `suggest_method`. 'suggest', 'subject_suggest', or other. Defaults to 'suggest' Returns ------- next_points : list Next points to be sent to adjudicator or queue uid : str """ if suggest_method is None: suggest_method = self.suggest docs, next_points = suggest_method(batch_size) uid = str(uuid.uuid4()) for batch_idx, (doc, next_point) in enumerate(zip(docs, next_points)): doc["suggestion"] = next_point doc["batch_idx"] = batch_idx doc["batch_size"] = len(next_points) self._write_event(stream_name, doc, uid=f"{uid}/{batch_idx}") return next_points, uid def add_suggestions_to_queue(self, batch_size: int): """Calls suggest, adds suggestions to queue, and writes out events. This will create one event for each suggestion. """ next_points, uid = self._suggest_and_write_events(batch_size) logger.info(f"Issued suggestion and adding to the queue. {uid}") self._add_to_queue(next_points, uid) self._check_queue_and_start() # TODO: remove this and encourage updated qserver functionality def _create_suggestion_list(self, points: Sequence, uid: str, measurement_plan: Optional[Callable] = None): """Create suggestions for adjudicator""" suggestions = [] for point in points: plan_name, args, kwargs = ( self.measurement_plan(point) if measurement_plan is None else measurement_plan(point) ) kwargs.setdefault("md", {}) kwargs["md"].update(self.default_plan_md) kwargs["md"]["agent_suggestion_uid"] = uid suggestions.append( Suggestion( suggestion_uid=uid, plan_name=plan_name, plan_args=args, plan_kwargs=kwargs, ) ) return suggestions def generate_suggestions_for_adjudicator(self, batch_size: int): """Calls suggest, sends suggestions to adjudicator, and writes out events. This will create one event for each suggestion.""" next_points, uid = self._suggest_and_write_events(batch_size) logger.info(f"Issued suggestion and sending to the adjudicator. {uid}") suggestions = self._create_suggestion_list(next_points, uid) msg = AdjudicatorMsg( agent_name=self.instance_name, uid=str(uuid.uuid4()), suggestions={self.endstation_key: suggestions}, ) self.kafka_producer(ADJUDICATOR_STREAM_NAME, msg.dict()) def generate_report(self, **kwargs): doc = self.report(**kwargs) uid = self._write_event("report", doc) logger.info(f"Issued report request and writing event. {uid}") @staticmethod def trigger_condition(uid) -> bool: return True def _ingest_uid(self, uid): """Private ingest to encapsulate the processing of a uid. This allows the agent provided ingest to just consume an independent and dependent variable. Parameters ---------- uid : str Unique key to grab from Tiled. """ run = self.exp_catalog[uid] try: independent_variable, dependent_variable = self.unpack_run(run) except KeyError as e: logger.warning(f"Ignoring key error in unpack for data {uid}:\n {e}") return logger.debug("Agent ingesting some new data.") doc = self.ingest(independent_variable, dependent_variable) doc["exp_uid"] = uid self._write_event("ingest", doc) self.known_uid_cache.append(uid) def _on_stop_router(self, name, doc): """Document router that runs each time a stop document is seen.""" if name != "stop": return uid = doc["run_start"] if not self.trigger_condition(uid): logger.debug( f"New data detected, but trigger condition not met. The agent will ignore this start doc: {uid}" ) return # Ingest logger.info(f"New data detected, agent ingesting this run uid: {uid}") self._ingest_uid(uid) # Report if self.report_on_ingest: self.generate_report(**self.default_report_kwargs) # Suggest if self.suggest_on_ingest: if self._direct_to_queue: self.add_suggestions_to_queue(1) else: self.generate_suggestions_for_adjudicator(1) def ingest_uids(self, uids: Iterable): """Give an agent an iterable of uids to learn from. This is an optional behavior for priming an agent without a complete restart.""" logger.info("Agent ingesting list of uids") for uid in uids: logger.info(f"Agent ingesting this run uid: {uid}") self._ingest_uid(uid) def start(self, suggest_at_start=False): """Starts kakfka listener in background thread Parameters ---------- suggest_at_start : bool, optional Whether to suggest new points immediately, by default False """ logger.debug("Issuing Agent start document and starting to listen to Kafka") self._compose_run_bundle = compose_run(metadata=self.metadata) self.agent_catalog.v1.insert("start", self._compose_run_bundle.start_doc) logger.info(f"Agent name={self._compose_run_bundle.start_doc['agent_name']}") logger.info(f"Agent start document uuid={self._compose_run_bundle.start_doc['uid']}") if suggest_at_start: self.add_suggestions_to_queue(1) self._kafka_thread = threading.Thread(target=self.kafka_consumer.start, name="agent-loop", daemon=True) self._kafka_thread.start() def stop(self, exit_status="success", reason=""): logger.debug("Attempting agent stop.") stop_doc = self._compose_run_bundle.compose_stop(exit_status=exit_status, reason=reason) self.agent_catalog.v1.insert("stop", stop_doc) self.kafka_producer.flush() self.kafka_consumer.stop() logger.info( f"Stopped agent with exit status {exit_status.upper()}" f"{(' for reason: ' + reason) if reason else '.'}" ) def close_and_restart(self, *, clear_uid_cache=False, reingest_all=False, reason=""): """Utility for closing and restarting an agent with the same name. This is primarily for methods that change the hyperparameters of an agent on the fly, but in doing so may change the shape/nature of the agent document stream. This will keep the documents consistent between hyperparameters as individual BlueskyRuns. Parameters ---------- clear_uid_cache : bool, optional Clears the cache of uids the agent has ingested, by default False. This is useful for a clean slate. reingest_all : bool, optional Resets the cache and the agent ingests all previous data from scratch, by default False. This can be useful if the agent has not retained knowledge from previous ingestion. reason : str, optional Reason for closing and restarting the agent, to be recorded to logs, by default "" """ self.stop(reason=f"Close and Restart: {reason}") if clear_uid_cache: self.known_uid_cache = list() elif reingest_all: uids = copy.copy(self.known_uid_cache) self.known_uid_cache = list() self.ingest_uids(uids) self.start() def signal_handler(self, signal, frame): self.stop(exit_status="abort", reason="forced exit ctrl+c") sys.exit(0) def _register_property(self, name: str, property_name: Optional[str] = None, **kwargs): """Wrapper to register property to bluesky-adaptive server instead of attribute or variable. Parameters ---------- name : str Name by which the variable is accessible through the REST API. The PV name is generated by converting the variable names to upper-case letters. The name does not need to match the actual name of the variable used in the code. The name should be selected so that it could be conveniently used in the API. property_name : Optional[str] The name of a class property, by default the same name used in the REST API. """ [kwargs.pop(key, None) for key in ("getter", "setter")] # Cannot pass getter/setter property_name = name if property_name is None else property_name register_variable( name, getter=lambda: getattr(self.__class__, property_name).fget(self), setter=lambda x: getattr(self.__class__, property_name).fset(self, x), **kwargs, ) def _register_method(self, name, method_name=None, **kwargs): """Wrapper to register generic method to bluesky-adaptive server instead of attribute or variable. To call the method, pass the setter a json with of form: {value: [[args,], {kwargs}]} This is a temporary solution that makes use of only the setter API and not a dedicated interface. This will be deprecated in the future. Parameters ---------- name : str Name by which the variable is accessible through the REST API. The PV name is generated by converting the variable names to upper-case letters. The name does not need to match the actual name of the variable used in the code. The name should be selected so that it could be conveniently used in the API. method_name : Optional[str] The name of the method, by default the same name used in the REST API. """ [kwargs.pop(key, None) for key in ("getter", "setter")] # Cannot pass getter/setter method_name = name if method_name is None else method_name if not isinstance(getattr(self, method_name), Callable): raise TypeError(f"Method {method_name} must be a callable function.") register_variable(name, setter=lambda value: start_task(getattr(self, method_name)(*value[0], **value[1]))) def server_registrations(self) -> None: """ Method to generate all server registrations during agent initialization. This method can be used in subclasses, to override or extend the default registrations. """ self._register_method("generate_report") self._register_method("add_suggestions_to_queue") self._register_method("ingest_uids") self._register_property("queue_add_position", pv_type="str") self._register_property("suggest_on_ingest", pv_type="bool") self._register_property("report_on_ingest", pv_type="bool") @staticmethod def qserver_from_host_and_key(host: str, key: str): """Convenience method to prouduce RE Manager object to manage communication with Queue Server. This is one of several paradigms for communication, albeit a common one. See bluesky_queueserver_api documentation for more details. Parameters ---------- host : str URI for host of HTTP Server key : str Authorization key for HTTP Server API Returns ------- qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin """ from bluesky_queueserver_api.http import REManagerAPI qserver = REManagerAPI(http_server_uri=host) qserver.set_authorization_key(api_key=key) return qserver @classmethod def from_config_kwargs( cls, kafka_group_id: str, kafka_bootstrap_servers: str, kafka_consumer_config: dict, kafka_producer_config: dict, publisher_topic: str, subscripion_topics: List[str], data_profile_name: str, agent_profile_name: str, qserver_host: str, qserver_api_key: str, **kwargs, ): """Convenience method for producing an Agent from keyword arguments describing the Kafka, Tiled, and Qserver setup. Assumes tiled is loaded from profile, and the REManagerAPI is based on the http api. Parameters ---------- kafka_group_id : str Required string identifier for the consumer's Kafka Consumer group. kafka_bootstrap_servers : str Comma-delimited list of Kafka server addresses as a string such as ``'broker1:9092,broker2:9092,127.0.0.1:9092'`` kafka_consumer_config : dict Override default configuration or specify additional configuration options to confluent_kafka.Consumer. kafka_producer_config : dict Override default configuration or specify additional configuration options to confluent_kafka.Producer. publisher_topic : str Existing topic to publish agent documents to. subscripion_topics : List[str] List of existing_topics as strings such as ["topic-1", "topic-2"]. These should be the sources of the Bluesky stop documents that trigger ``ingest`` and agent directives. data_profile_name : str Tiled profile name to serve as source of data (BlueskyRuns) for the agent. agent_profile_name : str Tiled profile name to serve as storage for the agent documents. qserver_host : str Host to POST requests to. Something akin to 'http://localhost:60610' qserver_api_key : str Key for API security. kwargs : dict Additional keyword arguments for init """ from bluesky_queueserver_api.http import REManagerAPI from tiled.client import from_profile kafka_consumer = AgentConsumer( topics=subscripion_topics, bootstrap_servers=kafka_bootstrap_servers, group_id=kafka_group_id, consumer_config=kafka_consumer_config, ) kafka_producer = Publisher( topic=publisher_topic, bootstrap_servers=kafka_bootstrap_servers, key="", producer_config=kafka_producer_config, ) tiled_data_node = from_profile(data_profile_name) tiled_agent_node = from_profile(agent_profile_name) re_manager = REManagerAPI(http_server_uri=qserver_host) re_manager.set_authorization_key(api_key=qserver_api_key) if "metadata" in kwargs: kwargs["metadata"].update( dict(tiled_data_profile=data_profile_name, tiled_agent_profile=agent_profile_name) ) return cls.__init__( kafka_consumer=kafka_consumer, kafka_producer=kafka_producer, tiled_data_node=tiled_data_node, tiled_agent_node=tiled_agent_node, qserver=re_manager, **kwargs, )
class MonarchSubjectAgent(Agent, ABC): # Drive a beamline. On stop doc check. By default manual trigger. def __init__( self, *args, subject_qserver: API_Threads_Mixin, subject_kafka_producer: Optional[Publisher] = None, subject_endstation_key: Optional[str] = "", **kwargs, ): """Abstract base class for a MonarchSubject agent. These agents only consume documents from one (Monarch) source, and can dictate the behavior of a different (Subject) queue. This can be useful in a multimodal measurement where one measurement is very fast and the other is very slow: after some amount of data collection on the fast measurement, the agent can dictate that the slow measurement probe what it considers as interesting. The agent maintains the functionality of a regular Agent, and adds plans to the Monarch queue. By default, the Subject is only directed when manually triggered by the agent server or by a kafka directive. If an automated approach to suggesting the subject is required, ``subject_suggest_condition`` must be overriden. This is commonly done by using a wall-clock interval, and/or a model confidence trigger. Children of MonarchSubjectAgent must implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run - subject_measurement_plan Agent specific: - ingest - suggest - subject_suggest - report (optional) - name (optional) Parameters ---------- subject_qserver : API_Threads_Mixin Object to manage communication with the Subject Queue Server subject_kafka_producer : Optional[Publisher] Bluesky Kafka publisher to produce document stream of agent actions to Adjudicators subject_endstation_key : Optional[str] Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit ops. For example, this could be a beamline three letter acronym or other distinct key. """ super().__init__(**kwargs) self.subject_re_manager = subject_qserver self.subject_kafka_producer = subject_kafka_producer self.subject_endstation_key = subject_endstation_key @abstractmethod def subject_measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: """Details for subject plan. Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan. Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters. Parameters ---------- point : ArrayLike Next point to measure using a given plan Returns ------- plan_name : str plan_args : List List of arguments to pass to plan from a point to measure. plan_kwargs : dict Dictionary of keyword arguments to pass the plan, from a point to measure. """ ... @abstractmethod def subject_suggest(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: """ Suggest a new batch of points to measure on the subject queue. Parameters ---------- batch_size : int Number of new points to measure Returns ------- docs : Sequence[dict] Documents of key metadata from the suggestion approach for each point in next_points. Must be length of batch size. next_points : Sequence[ArrayLike] Sequence of independent variables of length batch size """ ... def subject_suggest_condition(self): """Option to build in a trigger method that is run on using the document router subcription. Returns ------- bool """ return False def add_suggestions_to_subject_queue(self, batch_size: int): """Calls suggest, adds suggestions to queue, and writes out event""" next_points, uid = self._suggest_and_write_events(batch_size, self.subject_suggest, "subject_suggest") logger.info("Issued suggestion to subject and adding to the queue. {uid}") self._add_to_queue( next_points, uid, re_manager=self.subject_re_manager, position="front", plan_factory=self.subject_measurement_plan, ) def _on_stop_router(self, name, doc): ret = super()._on_stop_router(name, doc) if name != "stop": return ret if self.subject_suggest_condition(): if self._direct_to_queue: self.add_suggestions_to_subject_queue(1) else: raise NotImplementedError def generate_suggestions_for_adjudicator(self, batch_size: int): next_points, uid = self._suggest_and_write_events(batch_size, self.subject_suggest, "subject_suggest") logger.info(f"Issued subject suggest and sending to the adjudicator. {uid}") suggestions = self._create_suggestion_list(next_points, uid, self.subject_measurement_plan) msg = AdjudicatorMsg( agent_name=self.instance_name, uid=str(uuid.uuid4()), suggestions={self.subject_endstation_key: suggestions}, ) self.subject_kafka_producer(ADJUDICATOR_STREAM_NAME, msg.dict()) def server_registrations(self) -> None: super().server_registrations() self._register_method("add_suggestions_to_subject_queue")