Distributed Agents#
This section descibes how to work with agents whos distributed agents using the queueserver. It is particularly pertinent when:
The agent is slower than the measurement
lock-step behvior is not desired
Multiple agents should run simultaneously
Human’s and agents are meant to collaborate
These agents depend on Kafka for communication. Each time they see a stop document,
they check if the run is relevant to their decision making (via trigger_condition
),
then load the Bluesky Run through Tiled.
The run is then processed, and unpacked into the relevant independent and dependent variables
(via unpack_run
).
The agent then call’s its tell
method on the independent and dependent variables. In this case
The unpack_run
might pull out some motor coordinates, and a 2-d detector image; and the tell
might cache those as relative coordinates and some post-processing of the image.
Agents should subclass the blusky_adaptive.agents.base.agent
and are responsible for implementing the
following.
Experiment specific#
- abstract Agent.measurement_plan(point: _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]) Tuple[str, List, dict] [source]#
Fetch the string name of a registered plan, as well as the positional and keyword arguments to pass that plan.
Args/Kwargs is a common place to transform relative into absolute motor coords, or other device specific parameters.
- Parameters:
- pointArrayLike
Next point to measure using a given plan
- Returns:
- plan_namestr
- plan_argsList
List of arguments to pass to plan from a point to measure.
- plan_kwargsdict
Dictionary of keyword arguments to pass the plan, from a point to measure.
Agent ‘brains’ specific#
- abstract Agent.ask(batch_size: int) Tuple[Sequence[Dict[str, _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]]], Sequence[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]]] [source]#
Ask the agent for a new batch of points to measure.
- Parameters:
- batch_sizeint
Number of new points to measure
- Returns:
- docsSequence[dict]
Documents of key metadata from the ask approach for each point in next_points. Must be length of batch size.
- next_pointsSequence
Sequence of independent variables of length batch size
- Agent.report(**kwargs) Dict[str, _SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes]] [source]#
Create a report given the data observed by the agent. This could be potentially implemented in the base class to write document stream. Additional functionality for converting the report dict into an image or formatted report is the duty of the child class.
- property Agent.name: str#
Short string name
Agent Initialization Requirements#
The complete base class is given below. In it, there are a number of required arguments for initialization. It is expected that as the API changes some of these will become optional. Specifically, these arguments set up the communication between the agent and {Tiled, Kafka, and the QueueServer}.
- class bluesky_adaptive.agents.base.Agent(*, kafka_consumer: AgentConsumer, tiled_data_node: Container, tiled_agent_node: Container, qserver: API_Threads_Mixin, kafka_producer: Publisher | None, agent_run_suffix: str | None = None, metadata: dict | None = None, ask_on_tell: bool | None = True, direct_to_queue: bool | None = True, report_on_tell: bool | None = False, default_report_kwargs: dict | None = None, queue_add_position: int | Literal['front', 'back'] | None = None, endstation_key: str | None = '')[source]#
Abstract base class for a single plan agent. These agents should consume data, decide where to measure next, and execute a single type of plan (something akin to move and count). Alternatively, these agents can be used for soley reporting.
Base agent sets up a kafka subscription to listen to new stop documents, a catalog to read for experiments, a catalog to write agent status to, a kafka publisher to write agent documents to, and a manager API for the queue-server. Each time a stop document is read, the respective BlueskyRun is unpacked by the
unpack_run
method into an independent and dependent variable, and told to the agent by thetell
method.Children of Agent should implment the following, through direct inheritence or mixin classes: Experiment specific: - measurement_plan - unpack_run Agent specific: - tell - ask - report (optional) - name (optional)
- Parameters:
- kafka_consumerAgentConsumer
Consumer (subscriber) of Kafka Bluesky documents. It should be subcribed to the sources of Bluesky stop documents that will trigger
tell
. AgentConsumer is a child class of bluesky_kafka.RemoteDispatcher that enables kafka messages to trigger agent directives.- kafka_producerOptional[Publisher]
Bluesky Kafka publisher to produce document stream of agent actions for optional Adjudicator.
- tiled_data_nodetiled.client.container.Container
Tiled node to serve as source of data (BlueskyRuns) for the agent.
- tiled_agent_nodetiled.client.container.Container
Tiled node to serve as storage for the agent documents.
- qserverbluesky_queueserver_api.api_threads.API_Threads_Mixin
Object to manage communication with Queue Server
- agent_run_suffixOptional[str], optional
Agent name suffix for the instance, by default generated using 2 hyphen separated words from xkcdpass.
- metadataOptional[dict], optional
Optional extra metadata to add to agent start document, by default {}
- ask_on_tellbool, optional
Whether to ask for new points every time an agent is told about new data. To create a truly passive agent, it is best to implement an
ask
as a method that does nothing. To create an agent that only suggests new points periodically or on another trigger,ask_on_tell
should be set to False. By default True Can be adjusted usingenable_continuous_suggesting
anddisable_continuous_suggesting
.- direct_to_queueOptional[bool], optional
Whether the agent suggestions will be placed directly on the queue. If false, the suggestions will be sent to a Kafka topic for an Adjudicator to process. By default True Can be adjusted using
enable_direct_to_queue
anddisable_direct_to_queue
.- report_on_tellbool, optional
Whether to create a report every time an agent is told about new data. By default False. Can be adjusted using
enable_continuous_reporting
anddisable_continuous_reporting
.- default_report_kwargsOptional[dict], optional
Default kwargs for calling the
report
method, by default None- queue_add_positionOptional[Union[int, Literal["front", "back"]]], optional
Starting postion to add to the queue if adding directly to the queue, by default “back”.
- endstation_keyOptional[str]
Optional string that is needed for Adjudicator functionality. This keys the qserver API instance to a particular endstation. This way child Agents can maintain multiple queues for different unit operations. For example, this could be a beamline three letter acronym or other distinct key.
Examples#
The following example creates a simple hypothetical agent that only follows a sequence of
predetermined points. While not clever, it shows the underlying mechanics.
It copies some importable code from bluesky_adaptive.agents.simple
.
In this case the user need only construct CMSBaseAgent
for some CMS specific values,
and combine the SequentialAgent
and CMSBaseAgent
.
This can be done through direct inheritence or mixins, as all necessary methods are declared
as abstract in the base Agent.
The default API supports passing the objects for communication, allowing for flexible customization.
A convenience method from_config_kwargs
supports automatic construction of the Kafka producer and consumers,
as well as the qserver API using some common configuration keys. Both are demonstrated below for reference.
from bluesky_queueserver_api.http import REManagerAPI
import uuid
from typing import Sequence, Tuple, Union
import nslsii.kafka_utils
from numpy.typing import ArrayLike
class SequentialAgentBase(Agent, ABC):
"""Agent Mixin to take a pre-defined sequence and walk through it on ``ask``.
Parameters
----------
sequence : Sequence[Union[float, ArrayLike]]
Sequence of points to be queried
relative_bounds : Tuple[Union[float, ArrayLike]], optional
Relative bounds for the members of the sequence to follow, by default None
Attributes
----------
independent_cache : list
List of the independent variables at each observed point
observable_cache : list
List of all observables corresponding to the points in the independent_cache
sequence : Sequence[Union[float, ArrayLike]]
Sequence of points to be queried
relative_bounds : Tuple[Union[float, ArrayLike]], optional
Relative bounds for the members of the sequence to follow, by default None
ask_count : int
Number of queries this agent has made
"""
name = "sequential"
def __init__(
self,
*,
sequence: Sequence[Union[float, ArrayLike]],
relative_bounds: Tuple[Union[float, ArrayLike]] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.independent_cache = []
self.observable_cache = []
self.sequence = sequence
self.relative_bounds = relative_bounds
self.ask_count = 0
self._position_generator = self._create_position_generator()
def _create_position_generator(self) -> Generator:
"""Yield points from sequence if within bounds"""
for point in self.sequence:
if self.relative_bounds:
arr = np.array(point)
condition = arr <= self.relative_bounds[1] or arr >= self.relative_bounds[0]
try:
if condition:
yield point
continue
else:
logger.warning(
f"Next point will be skipped. {point} in sequence for {self.instance_name}, "
f"is out of bounds {self.relative_bounds}"
)
except ValueError: # Array not float
if condition.all():
yield arr
continue
else:
logger.warning(
f"Next point will be skipped. {point} in sequence for {self.instance_name}, "
f"is out of bounds {self.relative_bounds}"
)
else:
yield point
def tell(self, x, y) -> dict:
self.independent_cache.append(x)
self.observable_cache.append(y)
return dict(independent_variable=x, observable=y, cache_len=len(self.independent_cache))
def ask(self, batch_size: int = 1) -> Tuple[Sequence[dict[str, ArrayLike]], Sequence[ArrayLike]]:
docs = []
proposals = []
for _ in range(batch_size):
self.ask_count += 1
proposals.append(next(self._position_generator))
docs.append(dict(proposal=proposals[-1], ask_count=self.ask_count))
return docs, proposals
def report(self, **kwargs) -> dict:
return dict(percent_completion=self.ask_count / len(self.sequence))
class CMSBaseAgent:
def measurement_plan(self, point):
return "simple_scan", point, {}
def unpack_run(run: BlueskyRun) -> Tuple[Union[float, ArrayLike], Union[float, ArrayLike]]:
return run.start["experiment_state"], run.primary.data["detector"]
@staticmethod
def get_beamline_objects() -> dict:
beamline_tla = "cms"
kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
qs = REManagerAPI(http_server_uri=f"https://qserver.nsls2.bnl.gov/{beamline_tla}")
qs.set_authorization_key(api_key=None)
kafka_consumer = bluesky_adaptive.agents.base.AgentConsumer(
topics=[
f"{beamline_tla}.bluesky.runengine.documents",
],
consumer_config=kafka_config["runengine_producer_config"],
bootstrap_servers=kafka_config["bootstrap_servers"],
group_id=f"echo-{beamline_tla}-{str(uuid.uuid4())[:8]}"
)
kafka_producer = bluesky_kafka.Publisher(
topic=f"{beamline_tla}.bluesky.adjudicators",
bootstrap_servers=kafka_config["bootstrap_servers"],
key="cms.key",
producer_config=kafka_config["runengine_producer_config"]
)
return dict(kafka_consumer=kafka_consumer,
kafka_producer=kafka_producer,
tiled_data_node=tiled.client.from_profile(f"{beamline_tla}"),
tiled_agent_node=tiled.client.from_profile(f"{beamline_tla}_bluesky_sandbox"),
qserver=qs)
@staticmethod
def get_beamline_kwargs() -> dict:
beamline_tla = "cms"
kafka_config = nslsii.kafka_utils._read_bluesky_kafka_config_file(
config_file_path="/etc/bluesky/kafka.yml"
)
return dict(
kafka_group_id=f"echo-{beamline_tla}-{str(uuid.uuid4())[:8]}",
kafka_bootstrap_servers=kafka_config["bootstrap_servers"],
kafka_consumer_config=kafka_config["runengine_producer_config"],
kafka_producer_config=kafka_config["runengine_producer_config"],
publisher_topic=f"{beamline_tla}.bluesky.adjudicators",
subscripion_topics=[
f"{beamline_tla}.bluesky.runengine.documents",
],
data_profile_name=f"{beamline_tla}",
agent_profile_name=f"{beamline_tla}_bluesky_sandbox",
qserver_host=f"https://qserver.nsls2.bnl.gov/{beamline_tla}",
qserver_api_key=None
)
class CMSSequentialAgent(CMSBaseAgent, SequentialAgentBase):
def __init__(
self,
*,
sequence: Sequence[Union[float, ArrayLike]],
relative_bounds: Tuple[Union[float, ArrayLike]] = None,
**kwargs,
):
_default_kwargs = self.get_beamline_objects()
_default_kwargs.update(kwargs)
super().__init__(sequence=sequence, relative_bounds=relative_bounds, **_default_kwargs)
@classmethod
def from_config_kwargs(cls, *,
sequence: Sequence[Union[float, ArrayLike]],
relative_bounds: Tuple[Union[float, ArrayLike]] = None,
**kwargs,):
_default_kwargs = self.get_beamline_kwargs()
_default_kwargs.update(kwargs)
super().from_kwargs(sequence=sequence, relative_bounds=relative_bounds, **_default_kwargs)