Source code for bluesky_adaptive.agents.base

import copy
import inspect
import sys
import threading
import time as ttime
import uuid
from abc import ABC, abstractmethod
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from logging import getLogger
from typing import Callable, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, TypedDict, Union

import msgpack
import numpy as np
import tiled
from bluesky_kafka import Publisher, RemoteDispatcher
from bluesky_queueserver_api import BPlan
from bluesky_queueserver_api.api_threads import API_Threads_Mixin
from databroker.client import BlueskyRun
from event_model import compose_run
from numpy.typing import ArrayLike
from xkcdpass import xkcd_password as xp

from ..adjudicators.msg import DEFAULT_NAME as ADJUDICATOR_STREAM_NAME
from ..adjudicators.msg import AdjudicatorMsg, Suggestion
from ..server import register_variable, start_task

logger = getLogger("bluesky_adaptive.agents")
PASSWORD_LIST = xp.generate_wordlist(wordfile=xp.locate_wordfile(), min_length=3, max_length=6)


class AgentConsumer(RemoteDispatcher):
    def __init__(
        self,
        *,
        topics,
        bootstrap_servers,
        group_id,
        agent=None,
        consumer_config=None,
        polling_duration=0.05,
        deserializer=msgpack.loads,
    ):
        """Dispatch documents from Kafka to bluesky callbacks, modified for agent usage.
        This allows subscribing the dispatcher to an on-stop protocol for agents to be told about
        new Bluesky runs. It also provides an interface to trigger changes to the agent using the same
        Kafka topics.

        Parameters
        ----------
        topics : list
            List of topics as strings such as ["topic-1", "topic-2"]
        bootstrap_servers : str
            Comma-delimited list of Kafka server addresses as a string such as ``'127.0.0.1:9092'``
        group_id : str
            Required string identifier for Kafka Consumer group
        agent : Agent
            Instance of the agent to send directives to. Must be set to send directives.
        consumer_config : dict
            Override default configuration or specify additional configuration
            options to confluent_kafka.Consumer.
        polling_duration : float
            Time in seconds to wait for a message before running function work_while_waiting.
            Default is 0.05.
        deserializer : function, optional
            optional function to deserialize data. Default is msgpack.loads.
        """
        super().__init__(topics, bootstrap_servers, group_id, consumer_config, polling_duration, deserializer)
        self._agent = agent

    def _agent_action(self, topic, doc):
        """Exposes agent methods via the kafka topic.
        This allows bluesky plans, or adjudicators to interface with agent hyperparameters or settings.

        Parameters
        ----------
        topic : str
            the Kafka topic of the message containing name and doc
        doc : dict
            agent document expecting
            {
            'action': 'method_name',
            'args': [arg1,arg2,...],
            'kwargs': {kwarg1:val1, kwarg2:val2}
            }

        Returns
        -------
        continue_polling : bool
        """
        action = doc["action"]
        args = doc["args"]
        kwargs = doc["kwargs"]
        try:
            getattr(self._agent, action)(*args, **kwargs)
        except AttributeError as e:
            logger.error(
                f"Unavailable action sent to agent {self._agent.instance_name} on topic: {topic}\n" f"{e}"
            )
        except TypeError as e:
            logger.error(
                f"Type error for {action} sent to agent {self._agent.instance_name} on topic: {topic}\n"
                f"Are you sure your args and kwargs were appropriate?\n"
                f"Args received: {args}\n"
                f"Kwargs received: {kwargs}\n"
                f"Expected signature: {inspect.signature(getattr(self.agent, action))}\n"
                f"{e}"
            )
        return True

    def process_document(self, consumer, topic, name, doc):
        """
        Processes bluesky documents.
        Optionally
        Sends bluesky document to RemoteDispatcher.process(name, doc)
        If this method returns False the BlueskyConsumer will break out of the
        polling loop.

        Parameters
        ----------
        topic : str
            the Kafka topic of the message containing name and doc
        name : str
            bluesky document name: `start`, `descriptor`, `event`, etc.
        doc : dict
            bluesky document

        Returns
        -------
        continue_polling : bool
            return False to break out of the polling loop, return True to continue polling
        """
        if name == self._agent.instance_name:
            return self._agent_action(topic, doc)
        else:
            return super().process_document(consumer, topic, name, doc)

    def set_agent(self, agent):
        self._agent = agent


class DataKeys(TypedDict):
    dtype: str
    dtype_str: str
    dtype_descr: list
    shape: list


def infer_data_keys(doc: dict) -> DataKeys:
    data_keys = dict()
    _bad_iterables = (str, bytes, dict)
    _type_map = {
        "number": (float, np.floating, complex),
        "array": (np.ndarray, list, tuple),
        "string": (str,),
        "integer": (int, np.integer),
    }
    for key, val in doc.items():
        if isinstance(val, Iterable) and not isinstance(val, _bad_iterables):
            dtype = "array"
        else:
            for json_type, py_types in _type_map.items():
                if isinstance(val, py_types):
                    dtype = json_type
                    break
            else:
                raise TypeError()
        arr_val = np.asanyarray(val)
        arr_dtype = arr_val.dtype
        data_keys[key] = dict(
            dtype=dtype,
            dtype_str=arr_dtype.str,
            dtype_descr=arr_dtype.descr,
            shape=list(arr_val.shape),
            source="agent",
        )
    return data_keys


[docs] class Agent(ABC): """Abstract base class for a single plan agent. These agents should consume data, decide where to measure next, and execute a single type of plan (something akin to move and count). Alternatively, these agents can be used for soley reporting. Base agent sets up a kafka subscription to listen to new stop documents, a catalog to read for experiments, a catalog to write agent status to, a kafka publisher to write agent documents to, and a manager API for the queue-server. Each time a stop document is read, the respective BlueskyRun is unpacked by the ``unpack_run`` method into an independent and dependent variable, and told to the agent by the ``tell`` method. Children of Agent should implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run Agent specific: - tell - ask - report (optional) - name (optional) Parameters ---------- kafka_consumer : AgentConsumer Consumer (subscriber) of Kafka Bluesky documents. It should be subcribed to the sources of Bluesky stop documents that will trigger ``tell``. AgentConsumer is a child class of bluesky_kafka.RemoteDispatcher that enables kafka messages to trigger agent directives. kafka_producer : Optional[Publisher] Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator. tiled_data_node : tiled.client.container.Container Tiled node to serve as source of data (BlueskyRuns) for the agent. tiled_agent_node : tiled.client.container.Container Tiled node to serve as storage for the agent documents. qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin Object to manage communication with Queue Server agent_run_suffix : Optional[str], optional Agent name suffix for the instance, by default generated using 2 hyphen separated words from xkcdpass. metadata : Optional[dict], optional Optional extra metadata to add to agent start document, by default {} ask_on_tell : bool, optional Whether to ask for new points every time an agent is told about new data. To create a truly passive agent, it is best to implement an ``ask`` as a method that does nothing. To create an agent that only suggests new points periodically or on another trigger, ``ask_on_tell`` should be set to False. By default True Can be adjusted using ``enable_continuous_suggesting`` and ``disable_continuous_suggesting``. direct_to_queue : Optional[bool], optional Whether the agent suggestions will be placed directly on the queue. If false, the suggestions will be sent to a Kafka topic for an Adjudicator to process. By default True Can be adjusted using ``enable_direct_to_queue`` and ``disable_direct_to_queue``. report_on_tell : bool, optional Whether to create a report every time an agent is told about new data. By default False. Can be adjusted using ``enable_continuous_reporting`` and ``disable_continuous_reporting``. default_report_kwargs : Optional[dict], optional Default kwargs for calling the ``report`` method, by default None queue_add_position : Optional[Union[int, Literal["front", "back"]]], optional Starting postion to add to the queue if adding directly to the queue, by default "back". endstation_key : Optional[str] Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit operations. For example, this could be a beamline three letter acronym or other distinct key. """ def __init__( self, *, kafka_consumer: AgentConsumer, tiled_data_node: tiled.client.container.Container, tiled_agent_node: tiled.client.container.Container, qserver: API_Threads_Mixin, kafka_producer: Optional[Publisher], agent_run_suffix: Optional[str] = None, metadata: Optional[dict] = None, ask_on_tell: Optional[bool] = True, direct_to_queue: Optional[bool] = True, report_on_tell: Optional[bool] = False, default_report_kwargs: Optional[dict] = None, queue_add_position: Optional[Union[int, Literal["front", "back"]]] = None, endstation_key: Optional[str] = "", ): logger.debug("Initializing agent.") self.kafka_consumer = kafka_consumer self.kafka_consumer.set_agent(self) self.kafka_consumer.subscribe(self._on_stop_router) self.kafka_producer = kafka_producer logger.debug("Kafka set up successfully.") self.exp_catalog = tiled_data_node logger.info(f"Reading data from catalog: {self.exp_catalog}") self.agent_catalog = tiled_agent_node logger.info(f"Writing data to catalog: {self.agent_catalog}") self.metadata = metadata or {} self.instance_name = ( f"{self.name}-{agent_run_suffix}" if agent_run_suffix else f"{self.name}-{xp.generate_xkcdpassword(PASSWORD_LIST, numwords=2, delimiter='-')}" ) self.metadata["agent_name"] = self.instance_name self._ask_on_tell = ask_on_tell self._report_on_tell = report_on_tell self.default_report_kwargs = {} if default_report_kwargs is None else default_report_kwargs self._compose_run_bundle = None self._compose_descriptor_bundles = dict() self.re_manager = qserver self.endstation_key = endstation_key self._queue_add_position = "back" if queue_add_position is None else queue_add_position self._direct_to_queue = direct_to_queue self.default_plan_md = dict(agent_name=self.instance_name, agent_class=str(type(self))) self.tell_cache = list() try: self.server_registrations() except RuntimeError as e: logger.warning(f"Agent server unable to make registrations. Continuing regardless of\n {e}") self._kafka_thread = None
[docs] @abstractmethod def measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: """Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan. Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters. Parameters ---------- point : ArrayLike Next point to measure using a given plan Returns ------- plan_name : str plan_args : List List of arguments to pass to plan from a point to measure. plan_kwargs : dict Dictionary of keyword arguments to pass the plan, from a point to measure. """ ...
@staticmethod @abstractmethod def unpack_run(run: BlueskyRun) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]: """ Consume a Bluesky run from tiled and emit the relevant x and y for the agent. Parameters ---------- run : BlueskyRun Returns ------- independent_var : The independent variable of the measurement dependent_var : The measured data, processed for relevance """ ... @abstractmethod def tell(self, x, y) -> Dict[str, ArrayLike]: """ Tell the agent about some new data Parameters ---------- x : Independent variable for data observed y : Dependent variable for data observed Returns ------- dict Dictionary to be unpacked or added to a document """ ...
[docs] @abstractmethod def ask(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: """ Ask the agent for a new batch of points to measure. Parameters ---------- batch_size : int Number of new points to measure Returns ------- docs : Sequence[dict] Documents of key metadata from the ask approach for each point in next_points. Must be length of batch size. next_points : Sequence Sequence of independent variables of length batch size """ ...
[docs] def report(self, **kwargs) -> Dict[str, ArrayLike]: """ Create a report given the data observed by the agent. This could be potentially implemented in the base class to write document stream. Additional functionality for converting the report dict into an image or formatted report is the duty of the child class. """ raise NotImplementedError
def tell_many(self, xs, ys) -> Sequence[Dict[str, List]]: """ Tell the agent about some new data. It is likely that there is a more efficient approach to handling multiple observations for an agent. The default behavior is to iterate over all observations and call the ``tell`` method. Parameters ---------- xs : list, array Array of independent variables for observations ys : list, array Array of dependent variables for observations Returns ------- list_of_dict """ tell_emits = [] for x, y in zip(xs, ys): tell_emits.append(self.tell(x, y)) return tell_emits @property def queue_add_position(self) -> Union[int, Literal["front", "back"]]: return self._queue_add_position @queue_add_position.setter def queue_add_position(self, position: Union[int, Literal["front", "back"]]): self._queue_add_position = position def update_priority(self, position: Union[int, Literal["front", "back"]]): """Convenience method to update the priority of a direct to queue agent Parameters ---------- position : Union[int, Literal["front", "back"]] Position in priority for queue. """ self.queue_add_position = position @property def ask_on_tell(self) -> bool: return self._ask_on_tell @ask_on_tell.setter def ask_on_tell(self, flag: bool): self._ask_on_tell = flag @property def report_on_tell(self) -> bool: return self._report_on_tell @report_on_tell.setter def report_on_tell(self, flag: bool): self._report_on_tell = flag def enable_continuous_reporting(self): """Enable agent to report each time it receives data.""" self.report_on_tell = True def disable_continuous_reporting(self): """Disable agent to report each time it receives data.""" self.report_on_tell = False def enable_continuous_suggesting(self): """Enable agent to suggest new points to the queue each time it receives data.""" self.ask_on_tell = True def disable_continuous_suggesting(self): """Disable agent to suggest new points to the queue each time it receives data.""" self.ask_on_tell = False def enable_direct_to_queue(self): self._direct_to_queue = True def disable_direct_to_queue(self): self._direct_to_queue = False @property def name(self) -> str: """Short string name""" return "agent" @classmethod def build_from_argparse(cls, parser: ArgumentParser, **kwargs): args = parser.parse_args() _kwargs = vars(args) _kwargs.update(kwargs) return cls.__init__(**_kwargs) @classmethod def constructor_argparser(cls) -> ArgumentParser: """Convenience method to put all arguments into a parser""" parser = ArgumentParser(description=cls.__doc__, formatter_class=RawDescriptionHelpFormatter) parser.add_argument("--kafka-group-id", required=True) parser.add_argument("--kafka-bootstrap-servers", required=True) parser.add_argument("--kafka-consumer-config", required=True) parser.add_argument("--kafka-producer-config", required=True) parser.add_argument("--publisher-topic", required=True) parser.add_argument("--subscription-topics", required=True) parser.add_argument("--data-profile-name", required=True) parser.add_argument("--agent-profile-name", required=True) parser.add_argument("--qserver-host", required=True) parser.add_argument("--qserver-api-key", required=True) parser.add_argument("--metadata") return parser def _write_event(self, stream, doc, uid=None): """Add event to builder as event page, and publish to catalog""" if not doc: logger.info(f"No doc presented to write_event for stream {stream}") return if stream not in self._compose_descriptor_bundles: data_keys = infer_data_keys(doc) self._compose_descriptor_bundles[stream] = self._compose_run_bundle.compose_descriptor( name=stream, data_keys=data_keys ) self.agent_catalog.v1.insert("descriptor", self._compose_descriptor_bundles[stream].descriptor_doc) t = ttime.time() event_doc = self._compose_descriptor_bundles[stream].compose_event( data=doc, timestamps={k: t for k in doc}, uid=uid ) self.agent_catalog.v1.insert("event", event_doc) return event_doc["uid"] def _add_to_queue( self, next_points, uid, *, re_manager=None, position: Optional[Union[int, Literal["front", "back"]]] = None, plan_factory: Optional[Callable] = None, ): """ Adds a single set of points to the queue as bluesky plans Parameters ---------- next_points : Iterable New points to measure uid : str re_manager : Optional[bluesky_queueserver_api.api_threads.API_Threads_Mixin] Defaults to self.re_manager position : Optional[Union[int, Literal['front', 'back']]] Defaults to self.queue_add_position plan_factory : Optional[Callable] Function to generate plans from points. Defaults to self.measurement_plan. Callable should return a tuple of (plan_name, args, kwargs) Returns ------- """ for point in next_points: plan_factory = plan_factory or self.measurement_plan plan_name, args, kwargs = self.measurement_plan(point) kwargs.setdefault("md", {}) kwargs["md"].update(self.default_plan_md) kwargs["md"]["agent_ask_uid"] = uid plan = BPlan( plan_name, *args, **kwargs, ) if re_manager is None: re_manager = self.re_manager r = re_manager.item_add(plan, pos=self.queue_add_position if position is None else position) logger.debug(f"Sent http-server request for point {point}\n." f"Received reponse: {r}") return def _check_queue_and_start(self): """ If the queue runs out of plans, it will stop. That is, adding a plan to an empty queue will not run the plan. This will not be an issue when there are many agents adding plans to a queue. Giving agents the autonomy to start the queue is a risk that will be mitigated by only allowing the beamline scientists to open and close the environment. A queue cannot be started in a closed environment. """ status = self.re_manager.status(reload=True) if ( status["items_in_queue"] == 1 and status["worker_environment_exists"] is True and status["manager_state"] == "idle" ): self.re_manager.queue_start() logger.info("Agent is starting an idle queue with exactly 1 item.") def _ask_and_write_events( self, batch_size: int, ask_method: Optional[Callable] = None, stream_name: Optional[str] = "ask" ): """Private ask method for consistency across calls and changes to docs streams. Parameters ---------- batch_size : int Size of batch passed to ask ask_method : Optional[Callable] self.ask, or self.subject_ask, or some target ask function. Defaults to self.ask stream_name : Optional[str] Name for ask stream corresponding to `ask_method`. 'ask', 'subject_ask', or other. Defaults to 'ask' Returns ------- next_points : list Next points to be sent to adjudicator or queue uid : str """ if ask_method is None: ask_method = self.ask docs, next_points = ask_method(batch_size) uid = str(uuid.uuid4()) for batch_idx, (doc, next_point) in enumerate(zip(docs, next_points)): doc["suggestion"] = next_point doc["batch_idx"] = batch_idx doc["batch_size"] = len(next_points) self._write_event(stream_name, doc, uid=f"{uid}/{batch_idx}") return next_points, uid def add_suggestions_to_queue(self, batch_size: int): """Calls ask, adds suggestions to queue, and writes out events. This will create one event for each suggestion. """ next_points, uid = self._ask_and_write_events(batch_size) logger.info(f"Issued ask and adding to the queue. {uid}") self._add_to_queue(next_points, uid) self._check_queue_and_start() # TODO: remove this and encourage updated qserver functionality def _create_suggestion_list(self, points: Sequence, uid: str, measurement_plan: Optional[Callable] = None): """Create suggestions for adjudicator""" suggestions = [] for point in points: plan_name, args, kwargs = ( self.measurement_plan(point) if measurement_plan is None else measurement_plan(point) ) kwargs.setdefault("md", {}) kwargs["md"].update(self.default_plan_md) kwargs["md"]["agent_ask_uid"] = uid suggestions.append( Suggestion( ask_uid=uid, plan_name=plan_name, plan_args=args, plan_kwargs=kwargs, ) ) return suggestions def generate_suggestions_for_adjudicator(self, batch_size: int): """Calls ask, sends suggestions to adjudicator, and writes out events. This will create one event for each suggestion.""" next_points, uid = self._ask_and_write_events(batch_size) logger.info(f"Issued ask and sending to the adjudicator. {uid}") suggestions = self._create_suggestion_list(next_points, uid) msg = AdjudicatorMsg( agent_name=self.instance_name, suggestions_uid=str(uuid.uuid4()), suggestions={self.endstation_key: suggestions}, ) self.kafka_producer(ADJUDICATOR_STREAM_NAME, msg.dict()) def generate_report(self, **kwargs): doc = self.report(**kwargs) uid = self._write_event("report", doc) logger.info(f"Issued report request and writing event. {uid}") @staticmethod def trigger_condition(uid) -> bool: return True def _tell(self, uid): """Private tell to encapsulate the processing of a uid. This allows the user tell to just consume an independent and dependent variable. Parameters ---------- uid : str Unique key to grab from Tiled. """ run = self.exp_catalog[uid] try: independent_variable, dependent_variable = self.unpack_run(run) except KeyError as e: logger.warning(f"Ignoring key error in unpack for data {uid}:\n {e}") return logger.debug("Telling agent about some new data.") doc = self.tell(independent_variable, dependent_variable) doc["exp_uid"] = uid self._write_event("tell", doc) self.tell_cache.append(uid) def _on_stop_router(self, name, doc): """Document router that runs each time a stop document is seen.""" if name != "stop": return uid = doc["run_start"] if not self.trigger_condition(uid): logger.debug( f"New data detected, but trigger condition not met. The agent will ignore this start doc: {uid}" ) return # Tell logger.info(f"New data detected, telling the agent about this start doc: {uid}") self._tell(uid) # Report if self.report_on_tell: self.generate_report(**self.default_report_kwargs) # Ask if self.ask_on_tell: if self._direct_to_queue: self.add_suggestions_to_queue(1) else: self.generate_suggestions_for_adjudicator(1) def tell_agent_by_uid(self, uids: Iterable): """Give an agent an iterable of uids to learn from. This is an optional behavior for priming an agent without a complete restart.""" logger.info("Telling agent list of uids") for uid in uids: logger.info(f"Telling agent about start document{uid}") self._tell(uid) def start(self, ask_at_start=False): """Starts kakfka listener in background thread Parameters ---------- ask_at_start : bool, optional Whether to ask for a suggestion immediately, by default False """ logger.debug("Issuing Agent start document and starting to listen to Kafka") self._compose_run_bundle = compose_run(metadata=self.metadata) self.agent_catalog.v1.insert("start", self._compose_run_bundle.start_doc) logger.info(f"Agent name={self._compose_run_bundle.start_doc['agent_name']}") logger.info(f"Agent start document uuid={self._compose_run_bundle.start_doc['uid']}") if ask_at_start: self.add_suggestions_to_queue(1) self._kafka_thread = threading.Thread(target=self.kafka_consumer.start, name="agent-loop", daemon=True) self._kafka_thread.start() def stop(self, exit_status="success", reason=""): logger.debug("Attempting agent stop.") stop_doc = self._compose_run_bundle.compose_stop(exit_status=exit_status, reason=reason) self.agent_catalog.v1.insert("stop", stop_doc) self.kafka_producer.flush() self.kafka_consumer.stop() logger.info( f"Stopped agent with exit status {exit_status.upper()}" f"{(' for reason: ' + reason) if reason else '.'}" ) def close_and_restart(self, *, clear_tell_cache=False, retell_all=False, reason=""): """Utility for closing and restarting an agent with the same name. This is primarily for methods that change the hyperparameters of an agent on the fly, but in doing so may change the shape/nature of the agent document stream. This will keep the documents consistent between hyperparameters as individual BlueskyRuns. Parameters ---------- clear_tell_cache : bool, optional Clears the cache of data the agent has been told about, by default False. This is useful for a clean slate. retell_all : bool, optional Resets the cache and tells the agent about all previous data, by default False. This can be useful if the agent has not retained knowledge from previous tells. reason : str, optional Reason for closing and restarting the agent, to be recorded to logs, by default "" """ self.stop(reason=f"Close and Restart: {reason}") if clear_tell_cache: self.tell_cache = list() elif retell_all: uids = copy.copy(self.tell_cache) self.tell_cache = list() self.tell_agent_by_uid(uids) self.start() def signal_handler(self, signal, frame): self.stop(exit_status="abort", reason="forced exit ctrl+c") sys.exit(0) def _register_property(self, name: str, property_name: Optional[str] = None, **kwargs): """Wrapper to register property to bluesky-adaptive server instead of attribute or variable. Parameters ---------- name : str Name by which the variable is accessible through the REST API. The PV name is generated by converting the variable names to upper-case letters. The name does not need to match the actual name of the variable used in the code. The name should be selected so that it could be conveniently used in the API. property_name : Optional[str] The name of a class property, by default the same name used in the REST API. """ [kwargs.pop(key, None) for key in ("getter", "setter")] # Cannot pass getter/setter property_name = name if property_name is None else property_name register_variable( name, getter=lambda: getattr(self.__class__, property_name).fget(self), setter=lambda x: getattr(self.__class__, property_name).fset(self, x), **kwargs, ) def _register_method(self, name, method_name=None, **kwargs): """Wrapper to register generic method to bluesky-adaptive server instead of attribute or variable. To call the method, pass the setter a json with of form: {value: [[args,], {kwargs}]} This is a temporary solution that makes use of only the setter API and not a dedicated interface. This will be deprecated in the future. Parameters ---------- name : str Name by which the variable is accessible through the REST API. The PV name is generated by converting the variable names to upper-case letters. The name does not need to match the actual name of the variable used in the code. The name should be selected so that it could be conveniently used in the API. method_name : Optional[str] The name of the method, by default the same name used in the REST API. """ [kwargs.pop(key, None) for key in ("getter", "setter")] # Cannot pass getter/setter method_name = name if method_name is None else method_name if not isinstance(getattr(self, method_name), Callable): raise TypeError(f"Method {method_name} must be a callable function.") register_variable(name, setter=lambda value: start_task(getattr(self, method_name)(*value[0], **value[1]))) def server_registrations(self) -> None: """ Method to generate all server registrations during agent initialization. This method can be used in subclasses, to override or extend the default registrations. """ self._register_method("generate_report") self._register_method("add_suggestions_to_queue") self._register_method("tell_agent_by_uid") self._register_property("queue_add_position", pv_type="str") self._register_property("ask_on_tell", pv_type="bool") self._register_property("report_on_tell", pv_type="bool") @staticmethod def qserver_from_host_and_key(host: str, key: str): """Convenience method to prouduce RE Manager object to manage communication with Queue Server. This is one of several paradigms for communication, albeit a common one. See bluesky_queueserver_api documentation for more details. Parameters ---------- host : str URI for host of HTTP Server key : str Authorization key for HTTP Server API Returns ------- qserver : bluesky_queueserver_api.api_threads.API_Threads_Mixin """ from bluesky_queueserver_api.http import REManagerAPI qserver = REManagerAPI(http_server_uri=host) qserver.set_authorization_key(api_key=key) return qserver @classmethod def from_config_kwargs( cls, kafka_group_id: str, kafka_bootstrap_servers: str, kafka_consumer_config: dict, kafka_producer_config: dict, publisher_topic: str, subscripion_topics: List[str], data_profile_name: str, agent_profile_name: str, qserver_host: str, qserver_api_key: str, **kwargs, ): """Convenience method for producing an Agent from keyword arguments describing the Kafka, Tiled, and Qserver setup. Assumes tiled is loaded from profile, and the REManagerAPI is based on the http api. Parameters ---------- kafka_group_id : str Required string identifier for the consumer's Kafka Consumer group. kafka_bootstrap_servers : str Comma-delimited list of Kafka server addresses as a string such as ``'broker1:9092,broker2:9092,127.0.0.1:9092'`` kafka_consumer_config : dict Override default configuration or specify additional configuration options to confluent_kafka.Consumer. kafka_producer_config : dict Override default configuration or specify additional configuration options to confluent_kafka.Producer. publisher_topic : str Existing topic to publish agent documents to. subscripion_topics : List[str] List of existing_topics as strings such as ["topic-1", "topic-2"]. These should be the sources of the Bluesky stop documents that trigger ``tell`` and agent directives. data_profile_name : str Tiled profile name to serve as source of data (BlueskyRuns) for the agent. agent_profile_name : str Tiled profile name to serve as storage for the agent documents. qserver_host : str Host to POST requests to. Something akin to 'http://localhost:60610' qserver_api_key : str Key for API security. kwargs : dict Additional keyword arguments for init """ from bluesky_queueserver_api.http import REManagerAPI from tiled.client import from_profile kafka_consumer = AgentConsumer( topics=subscripion_topics, bootstrap_servers=kafka_bootstrap_servers, group_id=kafka_group_id, consumer_config=kafka_consumer_config, ) kafka_producer = Publisher( topic=publisher_topic, bootstrap_servers=kafka_bootstrap_servers, key="", producer_config=kafka_producer_config, ) tiled_data_node = from_profile(data_profile_name) tiled_agent_node = from_profile(agent_profile_name) re_manager = REManagerAPI(http_server_uri=qserver_host) re_manager.set_authorization_key(api_key=qserver_api_key) if "metadata" in kwargs: kwargs["metadata"].update( dict(tiled_data_profile=data_profile_name, tiled_agent_profile=agent_profile_name) ) return cls.__init__( kafka_consumer=kafka_consumer, kafka_producer=kafka_producer, tiled_data_node=tiled_data_node, tiled_agent_node=tiled_agent_node, qserver=re_manager, **kwargs, )
class MonarchSubjectAgent(Agent, ABC): # Drive a beamline. On stop doc check. By default manual trigger. def __init__( self, *args, subject_qserver: API_Threads_Mixin, subject_kafka_producer: Optional[Publisher] = None, subject_endstation_key: Optional[str] = "", **kwargs, ): """Abstract base class for a MonarchSubject agent. These agents only consume documents from one (Monarch) source, and can dictate the behavior of a different (Subject) queue. This can be useful in a multimodal measurement where one measurement is very fast and the other is very slow: after some amount of data collection on the fast measurement, the agent can dictate that the slow measurement probe what it considers as interesting. The agent maintains the functionality of a regular Agent, and adds plans to the Monarch queue. By default, the Subject is only directed when manually triggered by the agent server or by a kafka directive. If an automated approach to asking the subject is required, ``subject_ask_condition`` must be overriden. This is commonly done by using a wall-clock interval, and/or a model confidence trigger. Children of MonarchSubjectAgent must implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run - subject_measurement_plan Agent specific: - tell - ask - subject_ask - report (optional) - name (optional) Parameters ---------- subject_qserver : API_Threads_Mixin Object to manage communication with the Subject Queue Server subject_kafka_producer : Optional[Publisher] Bluesky Kafka publisher to produce document stream of agent actions to Adjudicators subject_endstation_key : Optional[str] Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit ops. For example, this could be a beamline three letter acronym or other distinct key. """ super().__init__(**kwargs) self.subject_re_manager = subject_qserver self.subject_kafka_producer = subject_kafka_producer self.subject_endstation_key = subject_endstation_key @abstractmethod def subject_measurement_plan(self, point: ArrayLike) -> Tuple[str, List, dict]: """Details for subject plan. Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan. Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters. Parameters ---------- point : ArrayLike Next point to measure using a given plan Returns ------- plan_name : str plan_args : List List of arguments to pass to plan from a point to measure. plan_kwargs : dict Dictionary of keyword arguments to pass the plan, from a point to measure. """ ... @abstractmethod def subject_ask(self, batch_size: int) -> Tuple[Sequence[Dict[str, ArrayLike]], Sequence[ArrayLike]]: """ Ask the agent for a new batch of points to measure on the subject queue. Parameters ---------- batch_size : int Number of new points to measure Returns ------- docs : Sequence[dict] Documents of key metadata from the ask approach for each point in next_points. Must be length of batch size. next_points : Sequence[ArrayLike] Sequence of independent variables of length batch size """ ... def subject_ask_condition(self): """Option to build in a trigger method that is run on using the document router subcription. Returns ------- bool """ return False def add_suggestions_to_subject_queue(self, batch_size: int): """Calls ask, adds suggestions to queue, and writes out event""" next_points, uid = self._ask_and_write_events(batch_size, self.subject_ask, "subject_ask") logger.info("Issued ask to subject and adding to the queue. {uid}") self._add_to_queue( next_points, uid, re_manager=self.subject_re_manager, position="front", plan_factory=self.subject_measurement_plan, ) def _on_stop_router(self, name, doc): ret = super()._on_stop_router(name, doc) if name != "stop": return ret if self.subject_ask_condition(): if self._direct_to_queue: self.add_suggestions_to_subject_queue(1) else: raise NotImplementedError def generate_suggestions_for_adjudicator(self, batch_size: int): next_points, uid = self._ask_and_write_events(batch_size, self.subject_ask, "subject_ask") logger.info(f"Issued subject ask and sending to the adjudicator. {uid}") suggestions = self._create_suggestion_list(next_points, uid, self.subject_measurement_plan) msg = AdjudicatorMsg( agent_name=self.instance_name, suggestions_uid=str(uuid.uuid4()), suggestions={self.subject_endstation_key: suggestions}, ) self.subject_kafka_producer(ADJUDICATOR_STREAM_NAME, msg.dict()) def server_registrations(self) -> None: super().server_registrations() self._register_method("add_suggestions_to_subject_queue")