Distributed Agents

This section descibes how to work with agents whos distributed agents using the queueserver. It is particularly pertinent when:

  • The agent is slower than the measurement

  • lock-step behvior is not desired

  • Multiple agents should run simultaneously

  • Human’s and agents are meant to collaborate

These agents depend on Kafka for communication. Each time they see a stop document, they check if the run is relevant to their decision making (via trigger_condition), then load the Bluesky Run through Tiled. The run is then processed, and unpacked into the relevant independent and dependent variables (via unpack_run). The agent then call’s its tell method on the independent and dependent variables. In this case The unpack_run might pull out some motor coordinates, and a 2-d detector image; and the tell might cache those as relative coordinates and some post-processing of the image.

Agents should subclass the blusky_adaptive.agents.base.agent and are responsible for implementing the following.

Experiment specific

abstract Agent.measurement_plan(point: _SupportsArray[dtype] | _NestedSequence[_SupportsArray[dtype]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) Tuple[str, List, dict][source]

Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan.

Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters.

Parameters:
pointArrayLike

Next point to measure using a given plan

Returns:
plan_namestr
plan_argsList

List of arguments to pass to plan from a point to measure.

plan_kwargsdict

Dictionary of keyword arguments to pass the plan, from a point to measure.

Agent ‘brains’ specific

abstract Agent.ask(batch_size: int) Tuple[Sequence[Dict[str, _SupportsArray[dtype] | _NestedSequence[_SupportsArray[dtype]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]]], Sequence[_SupportsArray[dtype] | _NestedSequence[_SupportsArray[dtype]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]]][source]

Ask the agent for a new batch of points to measure.

Parameters:
batch_sizeint

Number of new points to measure

Returns:
docsSequence[dict]

Documents of key metadata from the ask approach for each point in next_points. Must be length of batch size.

next_pointsSequence

Sequence of independent variables of length batch size

Agent.report(**kwargs) Dict[str, _SupportsArray[dtype] | _NestedSequence[_SupportsArray[dtype]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]][source]

Create a report given the data observed by the agent. This could be potentially implemented in the base class to write document stream. Additional functionality for converting the report dict into an image or formatted report is the duty of the child class.

property Agent.name: str

Short string name

The complete base class is given as follows:

class bluesky_adaptive.agents.base.Agent(*, kafka_consumer: AgentConsumer, tiled_data_node: Container, tiled_agent_node: Container, qserver: API_Threads_Mixin, kafka_producer: Publisher | None, agent_run_suffix: str | None = None, metadata: dict | None = None, ask_on_tell: bool | None = True, direct_to_queue: bool | None = True, report_on_tell: bool | None = False, default_report_kwargs: dict | None = None, queue_add_position: int | Literal['front', 'back'] | None = None, endstation_key: str | None = '')[source]

Abstract base class for a single plan agent. These agents should consume data, decide where to measure next, and execute a single type of plan (something akin to move and count). Alternatively, these agents can be used for soley reporting.

Base agent sets up a kafka subscription to listen to new stop documents, a catalog to read for experiments, a catalog to write agent status to, a kafka publisher to write agent documents to, and a manager API for the queue-server. Each time a stop document is read, the respective BlueskyRun is unpacked by the unpack_run method into an independent and dependent variable, and told to the agent by the tell method.

Children of Agent should implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run Agent specific: - tell - ask - report (optional) - name (optional)

Parameters:
kafka_consumerAgentConsumer

Consumer (subscriber) of Kafka Bluesky documents. It should be subcribed to the sources of Bluesky stop documents that will trigger tell. AgentConsumer is a child class of bluesky_kafka.RemoteDispatcher that enables kafka messages to trigger agent directives.

kafka_producerOptional[Publisher]

Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator.

tiled_data_nodetiled.client.node.Node

Tiled node to serve as source of data (BlueskyRuns) for the agent.

tiled_agent_nodetiled.client.node.Node

Tiled node to serve as storage for the agent documents.

qserverbluesky_queueserver_api.api_threads.API_Threads_Mixin

Object to manage communication with Queue Server

agent_run_suffixOptional[str], optional

Agent name suffix for the instance, by default generated using 2 hyphen separated words from xkcdpass.

metadataOptional[dict], optional

Optional extra metadata to add to agent start document, by default {}

ask_on_tellbool, optional

Whether to ask for new points every time an agent is told about new data. To create a truly passive agent, it is best to implement an ask as a method that does nothing. To create an agent that only suggests new points periodically or on another trigger, ask_on_tell should be set to False. By default True Can be adjusted using enable_continuous_suggesting and disable_continuous_suggesting.

direct_to_queueOptional[bool], optional

Whether the agent suggestions will be placed directly on the queue. If false, the suggestions will be sent to a Kafka topic for an Adjudicator to process. By default True Can be adjusted using enable_direct_to_queue and disable_direct_to_queue.

report_on_tellbool, optional

Whether to create a report every time an agent is told about new data. By default False. Can be adjusted using enable_continuous_reporting and disable_continuous_reporting.

default_report_kwargsOptional[dict], optional

Default kwargs for calling the report method, by default None

queue_add_positionOptional[Union[int, Literal["front", "back"]]], optional

Starting postion to add to the queue if adding directly to the queue, by default “back”.

endstation_keyOptional[str]

Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit operations. For example, this could be a beamline three letter acronym or other distinct key.

Examples

The following example creates a simple hypothetical agent that only follows a sequence of predetermined points. While not clever, it shows the underlying mechanics. It copies some importable code from bluesky_adaptive.agents.simple. In this case the user need only construct CMSBaseAgent for some CMS specific values, and combine the SequentialAgent and CMSBaseAgent. This can be done through direct inheritence or mixins, as all necessary methods are declared as abstract in the base Agent.

The default API supports passing the objects for communication, allowing for flexible customization. A convenience method from_config_kwargs supports automatic construction of the Kafka producer and consumers, as well as the qserver API using some common configuration keys. Both are demonstrated below for reference.

from bluesky_queueserver_api.http import REManagerAPI
import uuid
from typing import Sequence, Tuple, Union

import nslsii.kafka_utils
from numpy.typing import ArrayLike

class SequentialAgentBase(Agent, ABC):
    """Agent Mixin to take a pre-defined sequence and walk through it on ``ask``.

    Parameters
    ----------
    sequence : Sequence[Union[float, ArrayLike]]
        Sequence of points to be queried
    relative_bounds : Tuple[Union[float, ArrayLike]], optional
        Relative bounds for the members of the sequence to follow, by default None

    Attributes
    ----------
    independent_cache : list
        List of the independent variables at each observed point
    observable_cache : list
        List of all observables corresponding to the points in the independent_cache
    sequence : Sequence[Union[float, ArrayLike]]
        Sequence of points to be queried
    relative_bounds : Tuple[Union[float, ArrayLike]], optional
        Relative bounds for the members of the sequence to follow, by default None
    ask_count : int
        Number of queries this agent has made
    """

    name = "sequential"

    def __init__(
        self,
        *,
        sequence: Sequence[Union[float, ArrayLike]],
        relative_bounds: Tuple[Union[float, ArrayLike]] = None,
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.independent_cache = []
        self.observable_cache = []
        self.sequence = sequence
        self.relative_bounds = relative_bounds
        self.ask_count = 0
        self._position_generator = self._create_position_generator()

    def _create_position_generator(self) -> Generator:
        """Yield points from sequence if within bounds"""
        for point in self.sequence:
            if self.relative_bounds:
                arr = np.array(point)
                condition = arr <= self.relative_bounds[1] or arr >= self.relative_bounds[0]
                try:
                    if condition:
                        yield point
                        continue
                    else:
                        logger.warning(
                            f"Next point will be skipped.  {point} in sequence for {self.instance_name}, "
                            f"is out of bounds {self.relative_bounds}"
                        )
                except ValueError:  # Array not float
                    if condition.all():
                        yield arr
                        continue
                    else:
                        logger.warning(
                            f"Next point will be skipped.  {point} in sequence for {self.instance_name}, "
                            f"is out of bounds {self.relative_bounds}"
                        )
            else:
                yield point

    def tell(self, x, y) -> dict:
        self.independent_cache.append(x)
        self.observable_cache.append(y)
        return dict(independent_variable=x, observable=y, cache_len=len(self.independent_cache))

    def ask(self, batch_size: int = 1) -> Tuple[Sequence[dict[str, ArrayLike]], Sequence[ArrayLike]]:
        docs = []
        proposals = []
        for _ in range(batch_size):
            self.ask_count += 1
            proposals.append(next(self._position_generator))
            docs.append(dict(proposal=proposals[-1], ask_count=self.ask_count))
        return docs, proposals

    def report(self, **kwargs) -> dict:
        return dict(percent_completion=self.ask_count / len(self.sequence))

class CMSBaseAgent:

    def measurement_plan(self, point):
        return "simple_scan", point, {}

    def unpack_run(run: BlueskyRun) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]:
        return run.start["experiment_state"], run.primary.data["detector"]

    @staticmethod
    def get_beamline_objects() -> dict:
        beamline_tla = "cms"
        kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file(
            config_file_path="/etc/bluesky/kafka.yml"
        )
        qs = REManagerAPI(http_server_uri=f"https://qserver.nsls2.bnl.gov/{beamline_tla}")
        qs.set_authorization_key(api_key=None)

        kafka_consumer = bluesky_adaptive.agents.base.AgentConsumer(
            topics=[
                f"{beamline_tla}.bluesky.runengine.documents",
            ],
            consumer_config=kafka_config["runengine_producer_config"],
            bootstrap_servers=kafka_config["bootstrap_servers"],
            group_id=f"echo-{beamline_tla}-{str(uuid.uuid4())[:8]}"
        )
        kafka_producer = bluesky_kafka.Publisher(
            topic=f"{beamline_tla}.bluesky.adjudicators",
            bootstrap_servers=kafka_config["bootstrap_servers"],
            key="cms.key",
            producer_config=kafka_config["runengine_producer_config"]
        )

        return dict(kafka_consumer=kafka_consumer,
                    kafka_producer=kafka_producer,
                    tiled_data_node=tiled.client.from_profile(f"{beamline_tla}"),
                    tiled_agent_node=tiled.client.from_profile(f"{beamline_tla}_bluesky_sandbox"),
                    qserver=qs)


    @staticmethod
    def get_beamline_kwargs() -> dict:
        beamline_tla = "cms"
        kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file(
            config_file_path="/etc/bluesky/kafka.yml"
        )
        return dict(
            kafka_group_id=f"echo-{beamline_tla}-{str(uuid.uuid4())[:8]}",
            kafka_bootstrap_servers=kafka_config["bootstrap_servers"],
            kafka_consumer_config=kafka_config["runengine_producer_config"],
            kafka_producer_config=kafka_config["runengine_producer_config"],
            publisher_topic=f"{beamline_tla}.bluesky.adjudicators",
            subscripion_topics=[
                f"{beamline_tla}.bluesky.runengine.documents",
            ],
            data_profile_name=f"{beamline_tla}",
            agent_profile_name=f"{beamline_tla}_bluesky_sandbox",
            qserver_host=f"https://qserver.nsls2.bnl.gov/{beamline_tla}",
            qserver_api_key=None
        )




class CMSSequentialAgent(CMSBaseAgent, SequentialAgentBase):
    def __init__(
        self,
        *,
        sequence: Sequence[Union[float, ArrayLike]],
        relative_bounds: Tuple[Union[float, ArrayLike]] = None,
        **kwargs,
    ):
        _default_kwargs = self.get_beamline_objects()
        _default_kwargs.update(kwargs)
        super().__init__(sequence=sequence, relative_bounds=relative_bounds, **_default_kwargs)

    @classmethod
    def from_config_kwargs(cls, *,
        sequence: Sequence[Union[float, ArrayLike]],
        relative_bounds: Tuple[Union[float, ArrayLike]] = None,
        **kwargs,):
        _default_kwargs = self.get_beamline_kwargs()
        _default_kwargs.update(kwargs)
        super().from_kwargs(sequence=sequence, relative_bounds=relative_bounds, **_default_kwargs)