Note

Ophyd async is considered experimental until the v1.0 release and may change API on minor release numbers before then

ophyd_async.core#

Members

DetectorController

Classes implementing this interface should hold the logic for arming and disarming a detector

DetectorTrigger

Type of mechanism for triggering a detector to take frames

DetectorWriter

Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file).

StandardDetector

Useful detector base class for step and fly scanning detectors.

Device

Common base class for all Ophyd Async Devices.

DeviceConnector

Defines how a Device should be connected and type hints processed.

DeviceCollector

Collector of top level Device instances to be used as a context manager

DeviceVector

Defines device components with indices.

DeviceFiller

all_at_once

Sort all the values into a single phase so they are set all at once

get_signal_values

Get signal values in bulk.

load_device

Plan which loads PVs from a yaml file into a device.

load_from_yaml

Plan that returns a list of dicts with saved signal values from a yaml file.

save_device

Plan that saves the state of all PV's on a device using a sorter.

save_to_yaml

Plan which serialises a phase or set of phases of SignalRWs to a yaml file.

set_signal_values

Maps signals from a yaml file into device signals.

walk_rw_signals

Retrieve all SignalRWs from a device.

StandardFlyer

FlyerController

HDFDataset

HDFFile

param full_file_name:

Absolute path to the file to be written

config_ophyd_async_logging

Set a new handler on the logging.getLogger('ophyd_async') logger.

MockSignalBackend

Signal backend for testing, created by Device.connect(mock=True).

AsyncConfigurable

AsyncReadable

AsyncStageable

AutoIncrementFilenameProvider

AutoIncrementingPathProvider

FilenameProvider

NameProvider

PathInfo

Information about where and how to write a file.

PathProvider

DatasetDescriber

StaticFilenameProvider

StaticPathProvider

UUIDFilenameProvider

YMDPathProvider

StandardReadable

Device that owns its children and provides useful default behavior.

StandardReadableFormat

Declare how a Device should contribute to the StandardReadable verbs.

Signal

A Device with the concept of a value, with R, RW, W and X flavours

SignalConnector

SignalR

Signal that can be read from and monitored

SignalRW

Signal that can be both read and set

SignalW

Signal that can be set

SignalX

Signal that puts the default value

observe_value

Subscribe to the value of a signal so it can be iterated from.

observe_signals_value

Subscribe to the value of a signal so it can be iterated from.

set_and_wait_for_value

Set a signal and monitor it until it has that value.

set_and_wait_for_other_value

Set a signal and monitor another signal until it has the specified value.

soft_signal_r_and_setter

Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device.

soft_signal_rw

Creates a read-writable Signal with a SoftSignalBackend.

wait_for_value

Wait for a signal to have a matching value.

SignalBackend

A read/write/monitor backend for a Signals

make_datakey

StrictEnum

All members should exist in the Backend, and there will be no extras

SubsetEnum

All members should exist in the Backend, but there may be extras

SignalMetadata

SoftSignalBackend

An backend to a soft Signal, for test signals see MockSignalBackend.

AsyncStatus

Convert asyncio awaitable to bluesky Status interface

WatchableAsyncStatus

Convert AsyncIterator of WatcherUpdates to bluesky Status interface.

LazyMock

A lazily created Mock to be used when connecting in mock mode.

NotConnected

Exception to be raised if a Device.connect is cancelled

Reference

Hide an object behind a reference.

WatcherUpdate

A dataclass such that, when expanded, it provides the kwargs for a watcher

get_dtype

Get the runtime dtype from a numpy ndarray type annotation

get_enum_cls

Get the runtime dtype from a numpy ndarray type annotation

get_unique

If all values are the same, return that value, otherwise raise TypeError

in_micros

Converts between a positive number of seconds and an equivalent number of microseconds.

wait_for_connection

Call many underlying signals, accumulating exceptions and returning them

completed_status

class ophyd_async.core.DetectorController[source]#

Classes implementing this interface should hold the logic for arming and disarming a detector

abstract get_deadtime(exposure: float | None) float[source]#

For a given exposure, how long should the time between exposures be

abstract async prepare(trigger_info: TriggerInfo)[source]#

Do all necessary steps to prepare the detector for triggers.

Args:
trigger_info: This is a Pydantic model which contains

number Expected number of frames. trigger Type of trigger for which to prepare the detector. Defaults to DetectorTrigger.internal. livetime Livetime / Exposure time with which to set up the detector. Defaults to None if not applicable or the detector is expected to use its previously-set exposure time. deadtime Defaults to None. This is the minimum deadtime between triggers. multiplier The number of triggers grouped into a single StreamDatum index.

abstract async arm() None[source]#

Arm the detector

abstract async wait_for_idle()[source]#

This will wait on the internal _arm_status and wait for it to get disarmed/idle

abstract async disarm()[source]#

Disarm the detector, return detector to an idle state

class ophyd_async.core.DetectorTrigger(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Type of mechanism for triggering a detector to take frames

INTERNAL = 'internal'#

Detector generates internal trigger for given rate

EDGE_TRIGGER = 'edge_trigger'#

Expect a series of arbitrary length trigger signals

CONSTANT_GATE = 'constant_gate'#

Expect a series of constant width external gate signals

VARIABLE_GATE = 'variable_gate'#

Expect a series of variable width external gate signals

class ophyd_async.core.DetectorWriter[source]#

Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file)

abstract async open(multiplier: int = 1) dict[str, DataKey][source]#

Open writer and wait for it to be ready for data.

Args:
multiplier: Each StreamDatum index corresponds to this many

written exposures

Returns:

Output for describe()

abstract observe_indices_written(timeout=10.0) AsyncGenerator[int, None][source]#

Yield the index of each frame (or equivalent data point) as it is written

abstract async get_indices_written() int[source]#

Get the number of indices written

abstract collect_stream_docs(indices_written: int) AsyncIterator[Tuple[Literal['stream_resource'], StreamResource] | Tuple[Literal['stream_datum'], StreamDatum]][source]#

Create Stream docs up to given number written

abstract async close() None[source]#

Close writer, blocks until I/O is complete

class ophyd_async.core.StandardDetector(controller: DetectorController, writer: DetectorWriter, config_sigs: Sequence[SignalR] = (), name: str = '', connector: DeviceConnector | None = None)[source]#

Useful detector base class for step and fly scanning detectors. Aggregates controller and writer logic together.

prepare(value: TriggerInfo) None[source]#

Arm detector.

Prepare the detector with trigger information. This is determined at and passed in from the plan level.

This currently only prepares detectors for flyscans and stepscans just use the trigger information determined in trigger.

To do: Unify prepare to be use for both fly and step scans.

Args:

value: TriggerInfo describing how to trigger the detector

pydantic model ophyd_async.core.TriggerInfo[source]#

Minimal set of information required to setup triggering on a detector

Show JSON schema
{
   "title": "TriggerInfo",
   "description": "Minimal set of information required to setup triggering on a detector",
   "type": "object",
   "properties": {
      "number_of_triggers": {
         "anyOf": [
            {
               "minimum": 0,
               "type": "integer"
            },
            {
               "items": {
                  "minimum": 0,
                  "type": "integer"
               },
               "type": "array"
            }
         ],
         "title": "Number Of Triggers"
      },
      "trigger": {
         "$ref": "#/$defs/DetectorTrigger",
         "default": "internal"
      },
      "deadtime": {
         "anyOf": [
            {
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Deadtime"
      },
      "livetime": {
         "anyOf": [
            {
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Livetime"
      },
      "frame_timeout": {
         "anyOf": [
            {
               "exclusiveMinimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Frame Timeout"
      },
      "multiplier": {
         "default": 1,
         "title": "Multiplier",
         "type": "integer"
      }
   },
   "$defs": {
      "DetectorTrigger": {
         "description": "Type of mechanism for triggering a detector to take frames",
         "enum": [
            "internal",
            "edge_trigger",
            "constant_gate",
            "variable_gate"
         ],
         "title": "DetectorTrigger",
         "type": "string"
      }
   },
   "required": [
      "number_of_triggers"
   ]
}

Fields:
  • number_of_triggers (int | list[int])

  • trigger (ophyd_async.core._detector.DetectorTrigger)

  • deadtime (float | None)

  • livetime (float | None)

  • frame_timeout (float | None)

  • multiplier (int)

field deadtime: float | None = None#

What is the minimum deadtime between triggers

Constraints:
  • ge = 0

field frame_timeout: float | None = None#

What is the maximum timeout on waiting for a frame

Constraints:
  • gt = 0

field livetime: float | None = None#

What is the maximum high time of the triggers

Constraints:
  • ge = 0

field multiplier: int = 1#

How many triggers make up a single StreamDatum index, to allow multiple frames from a faster detector to be zipped with a single frame from a slow detector e.g. if num=10 and multiplier=5 then the detector will take 10 frames, but publish 2 indices, and describe() will show a shape of (5, h, w)

field number_of_triggers: int | list[int] [Required]#

This would trigger: - 2 times for dark field images - 3 times for initial flat field images - 100 times for projections - 3 times for final flat field images

field trigger: DetectorTrigger = DetectorTrigger.INTERNAL#

Sort of triggers that will be sent

property total_number_of_triggers: int#
class ophyd_async.core.Device(name: str = '', connector: DeviceConnector | None = None)[source]#

Common base class for all Ophyd Async Devices.

parent: Device | None = None#

The parent Device if it exists

property name: str#

Return the name of the Device

set_name(name: str, *, child_name_separator: str | None = None) None[source]#

Set self.name=name and each self.child.name=name+"-child".

Parameters:
  • name – New name to set

  • child_name_separator – Use this as a separator instead of “-”. Use “_” instead to make the same names as the equivalent ophyd sync device.

async connect(mock: bool | LazyMock = False, timeout: float = 10.0, force_reconnect: bool = False) None[source]#

Connect self and all child Devices.

Contains a timeout that gets propagated to child.connect methods.

Parameters:
  • mock – If True then use MockSignalBackend for all Signals

  • timeout – Time to wait before failing with a TimeoutError.

class ophyd_async.core.DeviceConnector[source]#

Defines how a Device should be connected and type hints processed.

create_children_from_annotations(device: Device)[source]#

Used when children can be created from introspecting the hardware.

Some control systems allow introspection of a device to determine what children it has. To allow this to work nicely with typing we add these hints to the Device like so:

my_signal: SignalRW[int]
my_device: MyDevice

This method will be run during Device.__init__, and is responsible for turning all of those type hints into real Signal and Device instances.

Subsequent runs of this function should do nothing, to allow it to be called early in Devices that need to pass references to their children during __init__.

async connect_real(device: Device, timeout: float, force_reconnect: bool)[source]#

Used during Device.connect.

This is called when a previous connect has not been done, or has been done in a different mock more. It should connect the Device and all its children.

class ophyd_async.core.DeviceCollector(set_name=True, child_name_separator: str = '-', connect=True, mock=False, timeout: float = 10.0)[source]#

Collector of top level Device instances to be used as a context manager

Parameters:
  • set_name – If True, call device.set_name(variable_name) on all collected Devices

  • child_name_separator – Use this as a separator if we call set_name.

  • connect – If True, call device.connect(mock) in parallel on all collected Devices

  • mock – If True, connect Signals in simulation mode

  • timeout – How long to wait for connect before logging an exception

Notes

Example usage:

[async] with DeviceCollector():
    t1x = motor.Motor("BLxxI-MO-TABLE-01:X")
    t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y")
    # Names and connects devices here
assert t1x.comm.velocity.source
assert t1x.name == "t1x"
class ophyd_async.core.DeviceVector(children: Mapping[int, DeviceT], name: str = '')[source]#

Defines device components with indices.

In the below example, foos becomes a dictionary on the parent device at runtime, so parent.foos[2] returns a FooDevice. For example usage see DynamicSensorGroup

ophyd_async.core.all_at_once(values: dict[str, Any]) Sequence[dict[str, Any]][source]#

Sort all the values into a single phase so they are set all at once

ophyd_async.core.get_signal_values(signals: dict[str, SignalRW[Any]], ignore: list[str] | None = None) Generator[Msg, Sequence[Location[Any]], dict[str, Any]][source]#

Get signal values in bulk.

Used as part of saving the signals of a device to a yaml file.

Parameters:
  • signals (Dict[str, SignalRW]) – Dictionary with pv names and matching SignalRW values. Often the direct result of walk_rw_signals().

  • ignore (Optional[List[str]]) – Optional list of PVs that should be ignored.

Returns:

A dictionary containing pv names and their associated values. Ignored pvs are set to None.

Return type:

Dict[str, Any]

ophyd_async.core.load_device(device: Device, path: str)[source]#

Plan which loads PVs from a yaml file into a device.

Parameters:
  • device (Device) – The device to load PVs into

  • path (str) – Path of the yaml file to load

ophyd_async.core.load_from_yaml(save_path: str) Sequence[dict[str, Any]][source]#

Plan that returns a list of dicts with saved signal values from a yaml file.

Parameters:

save_path (str) – Path of the yaml file to load from

ophyd_async.core.save_device(device: ~ophyd_async.core._device.Device, path: str, sorter: ~collections.abc.Callable[[dict[str, ~typing.Any]], ~collections.abc.Sequence[dict[str, ~typing.Any]]] = <function all_at_once>, ignore: list[str] | None = None)[source]#

Plan that saves the state of all PV’s on a device using a sorter.

The default sorter assumes all saved PVs can be loaded at once, and therefore can be saved at one time, i.e. all PVs will appear on one list in the resulting yaml file.

This can be a problem, because when the yaml is ingested with ophyd_async.core.load_device(), it will set all of those PVs at once. However, some PV’s need to be set before others - this is device specific.

Therefore, users should consider the order of device loading and write their own sorter algorithms accordingly.

See ophyd_async.fastcs.panda.phase_sorter() for a valid implementation of the sorter.

Parameters:
  • device (Device) – The device whose PVs should be saved.

  • path (str) – The path where the resulting yaml should be saved to

  • sorter (Callable[[Dict[str, Any]], Sequence[Dict[str, Any]]])

  • ignore (Optional[List[str]])

ophyd_async.core.save_to_yaml(phases: Sequence[dict[str, Any]], save_path: str | Path) None[source]#

Plan which serialises a phase or set of phases of SignalRWs to a yaml file.

Parameters:
  • phases (dict or list of dicts) – The values to save. Each item in the list is a seperate phase used when loading a device. In general this variable be the return value of get_signal_values.

  • save_path (str) – Path of the yaml file to write to

ophyd_async.core.set_signal_values(signals: dict[str, SignalRW[Any]], values: Sequence[dict[str, Any]]) Generator[Msg, None, None][source]#

Maps signals from a yaml file into device signals.

values contains signal values in phases, which are loaded in sequentially into the provided signals, to ensure signals are set in the correct order.

Parameters:
  • signals (Dict[str, SignalRW[Any]]) – Dictionary of named signals to be updated if value found in values argument. Can be the output of walk_rw_signals() for a device.

  • values (Sequence[Dict[str, Any]]) – List of dictionaries of signal name and value pairs, if a signal matches the name of one in the signals argument, sets the signal to that value. The groups of signals are loaded in their list order. Can be the output of load_from_yaml() for a yaml file.

ophyd_async.core.walk_rw_signals(device: Device, path_prefix: str | None = '') dict[str, SignalRW[Any]][source]#

Retrieve all SignalRWs from a device.

Stores retrieved signals with their dotted attribute paths in a dictionary. Used as part of saving and loading a device.

Parameters:
  • device (Device) – Ophyd device to retrieve read-write signals from.

  • path_prefix (str) – For internal use, leave blank when calling the method.

Returns:

class ophyd_async.core.HDFDataset(data_key: str, dataset: str, shape: collections.abc.Sequence[int] = <factory>, dtype_numpy: str = '', multiplier: int = 1, swmr: bool = False, chunk_shape: tuple[int, ...] = ())[source]#
class ophyd_async.core.HDFFile(full_file_name: Path, datasets: list[HDFDataset], hostname: str = 'localhost')[source]#
Parameters:
  • full_file_name – Absolute path to the file to be written

  • datasets – Datasets to write into the file

ophyd_async.core.config_ophyd_async_logging(file=<colorama.ansitowin32.StreamWrapper object>, fmt='%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d] %(message)s', datefmt='%y%m%d %H:%M:%S', color=True, level='WARNING')[source]#

Set a new handler on the logging.getLogger('ophyd_async') logger. If this is called more than once, the handler from the previous invocation is removed (if still present) and replaced.

Parameters:
  • file (object with write method or filename string) – Default is sys.stdout.

  • fmt (Overall logging format)

  • datefmt (string) – Date format. Default is '%H:%M:%S'.

  • color (boolean) – Use ANSI color codes. True by default.

  • level (str or int) – Python logging level, given as string or corresponding integer. Default is ‘WARNING’.

Returns:

handler – The handler, which has already been added to the ‘ophyd_async’ logger.

Return type:

logging.Handler

Examples

Log to a file.

config_ophyd_async_logging(file=’/tmp/what_is_happening.txt’)

Include the date along with the time. (The log messages will always include microseconds, which are configured separately, not as part of ‘datefmt’.)

config_ophyd_async_logging(datefmt=”%Y-%m-%d %H:%M:%S”)

Turn off ANSI color codes.

config_ophyd_async_logging(color=False)

Increase verbosity: show level DEBUG or higher.

config_ophyd_async_logging(level=’DEBUG’)

class ophyd_async.core.MockSignalBackend(initial_backend: SignalBackend[SignalDatatypeT], mock: LazyMock)[source]#

Signal backend for testing, created by Device.connect(mock=True).

class ophyd_async.core.PathInfo(directory_path: Path, filename: str, create_dir_depth: int = 0)[source]#

Information about where and how to write a file.

Parameters:
  • directory_path – Directory into which files should be written

  • filename – Base filename to use generated by FilenameProvider, w/o extension

  • create_dir_depth – Optional depth of directories to create if they do not exist

class ophyd_async.core.StandardReadable(name: str = '', connector: DeviceConnector | None = None)[source]#

Device that owns its children and provides useful default behavior.

  • When its name is set it renames child Devices

  • Signals can be registered for read() and read_configuration()

  • These signals will be subscribed for read() between stage() and unstage()

add_children_as_readables(format: StandardReadableFormat = StandardReadableFormat.CHILD) Generator[None, None, None][source]#

Context manager that calls add_readables on child Devices added within.

Scans self.children() on entry and exit to context manager, and calls add_readables on any that are added with the provided StandardReadableFormat.

add_readables(devices: Sequence[Device], format: StandardReadableFormat = StandardReadableFormat.CHILD) None[source]#

Add devices to contribute to various bluesky verbs.

Use output from the given devices to contribute to the verbs of the following interfaces:

Parameters:
  • devices – The devices to be added

  • format – Determines which of the devices functions are added to which verb as per the StandardReadableFormat documentation

class ophyd_async.core.StandardReadableFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Declare how a Device should contribute to the StandardReadable verbs.

CHILD = 'CHILD'#

Detect which verbs the child supports and contribute to:

CONFIG_SIGNAL = 'CONFIG_SIGNAL'#

Contribute the Signal value to read_configuration() and describe_configuration()

HINTED_SIGNAL = 'HINTED_SIGNAL'#

Contribute the monitored Signal value to read() and describe()` and put the signal name in hints

UNCACHED_SIGNAL = 'UNCACHED_SIGNAL'#

Contribute the uncached Signal value to read() and describe()`

HINTED_UNCACHED_SIGNAL = 'HINTED_UNCACHED_SIGNAL'#

Contribute the uncached Signal value to read() and describe()` and put the signal name in hints

class ophyd_async.core.Signal(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#

A Device with the concept of a value, with R, RW, W and X flavours

property source: str#

//PV_PREFIX:SIGNAL, or “” if not set

Type:

Like ca

class ophyd_async.core.SignalR(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be read from and monitored

async read(cached: bool | None = None) dict[str, Reading][source]#

Return a single item dict with the reading in it

async describe() dict[str, DataKey][source]#

Return a single item dict with the descriptor in it

async get_value(cached: bool | None = None) SignalDatatypeT[source]#

The current value

subscribe_value(function: Callable[[SignalDatatypeT], None])[source]#

Subscribe to updates in value of a device

subscribe(function: Callable[[dict[str, Reading]], None]) None[source]#

Subscribe to updates in the reading

clear_sub(function: Callable[[T], None]) None[source]#

Remove a subscription.

stage() None[source]#

Start caching this signal

unstage() None[source]#

Stop caching this signal

class ophyd_async.core.SignalRW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be both read and set

async locate() Location[source]#

Return the setpoint and readback.

class ophyd_async.core.SignalW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be set

set(value: SignalDatatypeT, wait=True, timeout: float | None | Literal['CALCULATE_TIMEOUT'] = 'CALCULATE_TIMEOUT') None[source]#

Set the value and return a status saying when it’s done

class ophyd_async.core.SignalX(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#

Signal that puts the default value

trigger(wait=True, timeout: float | None | Literal['CALCULATE_TIMEOUT'] = 'CALCULATE_TIMEOUT') None[source]#

Trigger the action and return a status saying when it’s done

async ophyd_async.core.observe_value(signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[SignalDatatypeT, None][source]#

Subscribe to the value of a signal so it can be iterated from.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end

  • timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError

  • done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.

  • done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an ‘asyncio.wait_for’ timeout

Notes

Due to a rare condition with busy signals, it is not recommended to use this function with asyncio.timeout, including in an ‘asyncio.wait_for’ loop. Instead, this timeout should be given to the done_timeout parameter.

Example usage:

async for value in observe_value(sig):
    do_something_with(value)
async ophyd_async.core.observe_signals_value(*signals: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None][source]#

Subscribe to the value of a signal so it can be iterated from.

Parameters:
  • signals – Call subscribe_value on all the signals at the start, and clear_sub on it at the end

  • timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError

  • done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.

  • done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an ‘asyncio.wait_for’ timeout

Notes

Example usage:

async for signal,value in observe_signals_values(sig1,sig2,..):
    if signal is sig1:
        do_something_with(value)
    elif signal is sig2:
        do_something_else_with(value)
async ophyd_async.core.set_and_wait_for_value(signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, match_value: SignalDatatypeT | Callable[[SignalDatatypeT], bool] | None = None, timeout: float = 10.0, status_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus[source]#

Set a signal and monitor it until it has that value.

Useful for busy record, or other Signals with pattern:
  • Set Signal with wait=True and stash the Status

  • Read the same Signal to check the operation has started

  • Return the Status so calling code can wait for operation to complete

Parameters:
  • signal – The signal to set

  • value – The value to set it to

  • match_value – The expected value of the signal after the operation. Used to verify that the set operation was successful.

  • timeout – How long to wait for the signal to have the value

  • status_timeout – How long the returned Status will wait for the set to complete

  • wait_for_set_completion – This will wait for set completion #More info in how-to docs

Notes

Example usage:

set_and_wait_for_value(device.acquire, 1)
async ophyd_async.core.set_and_wait_for_other_value(set_signal: SignalW[SignalDatatypeT], set_value: SignalDatatypeT, match_signal: SignalR[SignalDatatypeV], match_value: SignalDatatypeV | Callable[[SignalDatatypeV], bool], timeout: float = 10.0, set_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus[source]#

Set a signal and monitor another signal until it has the specified value.

This function sets a set_signal to a specified set_value and waits for a match_signal to have the match_value.

Parameters:
  • signal – The signal to set

  • set_value – The value to set it to

  • match_signal – The signal to monitor

  • match_value – The value to wait for

  • timeout – How long to wait for the signal to have the value

  • set_timeout – How long to wait for the set to complete

  • wait_for_set_completion – This will wait for set completion #More info in how-to docs

Notes

Example usage:

set_and_wait_for_value(device.acquire, 1, device.acquire_rbv, 1)
ophyd_async.core.soft_signal_r_and_setter(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]][source]#

Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device. May pass metadata, which are propagated into describe. Use soft_signal_rw if you want a device that is externally modifiable

ophyd_async.core.soft_signal_rw(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) SignalRW[SignalDatatypeT][source]#

Creates a read-writable Signal with a SoftSignalBackend. May pass metadata, which are propagated into describe.

async ophyd_async.core.wait_for_value(signal: SignalR[SignalDatatypeT], match: SignalDatatypeT | Callable[[SignalDatatypeT], bool], timeout: float | None)[source]#

Wait for a signal to have a matching value.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end

  • match – If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match.

  • timeout – How long to wait for the value to match

Notes

Example usage:

wait_for_value(device.acquiring, 1, timeout=1)

Or:

wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
class ophyd_async.core.SignalBackend(datatype: type[SignalDatatypeT] | None)[source]#

A read/write/monitor backend for a Signals

abstract source(name: str, read: bool) str[source]#

Return source of signal.

Signals may pass a name to the backend, which can be used or discarded.

abstract async connect(timeout: float)[source]#

Connect to underlying hardware

abstract async put(value: SignalDatatypeT | None, wait: bool)[source]#

Put a value to the PV, if wait then wait for completion

abstract async get_datakey(source: str) DataKey[source]#

Metadata like source, dtype, shape, precision, units

abstract async get_reading() Reading[SignalDatatypeT][source]#

The current value, timestamp and severity

abstract async get_value() SignalDatatypeT[source]#

The current value

abstract async get_setpoint() SignalDatatypeT[source]#

The point that a signal was requested to move to.

abstract set_callback(callback: Callable[[T], None] | None) None[source]#

Observe changes to the current value, timestamp and severity

class ophyd_async.core.StrictEnum(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

All members should exist in the Backend, and there will be no extras

class ophyd_async.core.SubsetEnum(value, *args, **kwargs)[source]#

All members should exist in the Backend, but there may be extras

class ophyd_async.core.SoftSignalBackend(datatype: type[SignalDatatypeT] | None, initial_value: SignalDatatypeT | None = None, units: str | None = None, precision: int | None = None)[source]#

An backend to a soft Signal, for test signals see MockSignalBackend.

class ophyd_async.core.AsyncStatus(awaitable: Coroutine | Task, name: str | None = None)[source]#

Convert asyncio awaitable to bluesky Status interface

classmethod wrap(f: Callable[[P], Coroutine]) Callable[[P], AS][source]#

Wrap an async function in an AsyncStatus.

class ophyd_async.core.WatchableAsyncStatus(iterator: AsyncIterator[WatcherUpdate[T]], name: str | None = None)[source]#

Convert AsyncIterator of WatcherUpdates to bluesky Status interface.

classmethod wrap(f: Callable[[P], AsyncIterator[WatcherUpdate[T]]]) Callable[[P], WAS][source]#

Wrap an AsyncIterator in a WatchableAsyncStatus.

class ophyd_async.core.LazyMock(name: str = '', parent: LazyMock | None = None)[source]#

A lazily created Mock to be used when connecting in mock mode.

Creating Mocks is reasonably expensive when each Device (and Signal) requires its own, and the tree is only used when Signal.set() is called. This class allows a tree of lazily connected Mocks to be constructed so that when the leaf is created, so are its parents. Any calls to the child are then accessible from the parent mock.

>>> parent = LazyMock()
>>> child = parent.child("child")
>>> child_mock = child()
>>> child_mock()  
<Mock name='mock.child()' id='...'>
>>> parent_mock = parent()
>>> parent_mock.mock_calls
[call.child()]
exception ophyd_async.core.NotConnected(errors: str | Mapping[str, Exception])[source]#

Exception to be raised if a Device.connect is cancelled

class ophyd_async.core.Reference(obj: T)[source]#

Hide an object behind a reference.

Used to opt out of the naming/parent-child relationship of Device.

For example:

class DeviceWithRefToSignal(Device):
    def __init__(self, signal: SignalRW[int]):
        self.signal_ref = Reference(signal)
        super().__init__()

    def set(self, value) -> AsyncStatus:
        return self.signal_ref().set(value + 1)
pydantic model ophyd_async.core.Table[source]#

An abstraction of a Table of str to numpy array.

Show JSON schema
{
   "title": "Table",
   "description": "An abstraction of a Table of str to numpy array.",
   "type": "object",
   "properties": {},
   "additionalProperties": true
}

Validators:
  • validate_array_dtypes » all fields

  • validate_lengths » all fields

numpy_dtype() dtype[source]#
numpy_table(selection: slice | None = None) ndarray[source]#
validator validate_array_dtypes  »  all fields[source]#
validator validate_lengths  »  all fields[source]#
class ophyd_async.core.WatcherUpdate(current: T, initial: T, target: T, name: str | None = None, unit: str | None = None, precision: float | None = None, fraction: float | None = None, time_elapsed: float | None = None, time_remaining: float | None = None)[source]#

A dataclass such that, when expanded, it provides the kwargs for a watcher

ophyd_async.core.get_dtype(datatype: type) dtype[source]#

Get the runtime dtype from a numpy ndarray type annotation

>>> from ophyd_async.core import Array1D
>>> import numpy as np
>>> get_dtype(Array1D[np.int8])
dtype('int8')
ophyd_async.core.get_enum_cls(datatype: type | None) type[StrictEnum] | None[source]#

Get the runtime dtype from a numpy ndarray type annotation

>>> import numpy.typing as npt
>>> import numpy as np
>>> get_dtype(npt.NDArray[np.int8])
dtype('int8')
ophyd_async.core.get_unique(values: dict[str, T], types: str) T[source]#

If all values are the same, return that value, otherwise raise TypeError

>>> get_unique({"a": 1, "b": 1}, "integers")
1
>>> get_unique({"a": 1, "b": 2}, "integers")
Traceback (most recent call last):
 ...
TypeError: Differing integers: a has 1, b has 2
ophyd_async.core.in_micros(t: float) int[source]#

Converts between a positive number of seconds and an equivalent number of microseconds.

Args:

t (float): A time in seconds

Raises:

ValueError: if t < 0

Returns:

t (int): A time in microseconds, rounded up to the nearest whole microsecond,

async ophyd_async.core.wait_for_connection(**coros: Awaitable[None])[source]#

Call many underlying signals, accumulating exceptions and returning them

Expected kwargs should be a mapping of names to coroutine tasks to execute.