Note

Ophyd async is considered experimental until the v1.0 release and may change API on minor release numbers before then

ophyd_async.core#

Members

DetectorControl

Classes implementing this interface should hold the logic for arming and disarming a detector

DetectorTrigger

Type of mechanism for triggering a detector to take frames

DetectorWriter

Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file).

StandardDetector

Useful detector base class for step and fly scanning detectors.

Device

Common base class for all Ophyd Async Devices.

DeviceCollector

Collector of top level Device instances to be used as a context manager

DeviceVector

Defines device components with indices.

all_at_once

Sort all the values into a single phase so they are set all at once

get_signal_values

Get signal values in bulk.

load_device

Plan which loads PVs from a yaml file into a device.

load_from_yaml

Plan that returns a list of dicts with saved signal values from a yaml file.

save_device

Plan that saves the state of all PV's on a device using a sorter.

save_to_yaml

Plan which serialises a phase or set of phases of SignalRWs to a yaml file.

set_signal_values

Maps signals from a yaml file into device signals.

walk_rw_signals

Retrieve all SignalRWs from a device.

StandardFlyer

TriggerLogic

HDFDataset

HDFFile

param full_file_name:

Absolute path to the file to be written

config_ophyd_async_logging

Set a new handler on the logging.getLogger('ophyd_async') logger.

MockSignalBackend

Signal backend for testing, created by Device.connect(mock=True).

callback_on_mock_put

For setting a callback when a backend is put to.

get_mock_put

Get the mock associated with the put call on the signal.

mock_puts_blocked

reset_mock_put_calls

set_mock_put_proceeds

Allow or block a put with wait=True from proceeding

set_mock_value

Set the value of a signal that is in mock mode.

set_mock_values

Iterator to set a signal to a sequence of values, optionally repeating the sequence.

AsyncConfigurable

AsyncReadable

AsyncStageable

AutoIncrementFilenameProvider

AutoIncrementingPathProvider

FilenameProvider

NameProvider

PathInfo

Information about where and how to write a file.

PathProvider

DatasetDescriber

StaticFilenameProvider

StaticPathProvider

UUIDFilenameProvider

YMDPathProvider

ConfigSignal

HintedSignal

StandardReadable

Device that owns its children and provides useful default behavior.

Signal

A Device with the concept of a value, with R, RW, W and X flavours

SignalR

Signal that can be read from and monitored

SignalRW

Signal that can be both read and set

SignalW

Signal that can be set

SignalX

Signal that puts the default value

assert_configuration

Assert readings from Configurable.

assert_emitted

Assert emitted document generated by running a Bluesky plan

assert_reading

Assert readings from readable.

assert_value

Assert a signal's value and compare it an expected signal.

observe_value

Subscribe to the value of a signal so it can be iterated from.

set_and_wait_for_value

Set a signal and monitor it until it has that value.

set_and_wait_for_other_value

Set a signal and monitor another signal until it has the specified value.

soft_signal_r_and_setter

Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device.

soft_signal_rw

Creates a read-writable Signal with a SoftSignalBackend.

wait_for_value

Wait for a signal to have a matching value.

RuntimeSubsetEnum

SignalBackend

A read/write/monitor backend for a Signals

SubsetEnum

alias of RuntimeSubsetEnum

SignalMetadata

SoftSignalBackend

An backend to a soft Signal, for test signals see MockSignalBackend.

AsyncStatus

Convert asyncio awaitable to bluesky Status interface

WatchableAsyncStatus

Convert AsyncIterator of WatcherUpdates to bluesky Status interface.

NotConnected

Exception to be raised if a Device.connect is cancelled

WatcherUpdate

A dataclass such that, when expanded, it provides the kwargs for a watcher

get_dtype

Get the runtime dtype from a numpy ndarray type annotation

get_unique

If all values are the same, return that value, otherwise raise TypeError

in_micros

Converts between a positive number of seconds and an equivalent number of microseconds.

is_pydantic_model

wait_for_connection

Call many underlying signals, accumulating exceptions and returning them

completed_status

class ophyd_async.core.DetectorControl[source]#

Classes implementing this interface should hold the logic for arming and disarming a detector

abstract get_deadtime(exposure: float | None) float[source]#

For a given exposure, how long should the time between exposures be

abstract async prepare(trigger_info: TriggerInfo)[source]#

Do all necessary steps to prepare the detector for triggers.

Args:
trigger_info: This is a Pydantic model which contains

number Expected number of frames. trigger Type of trigger for which to prepare the detector. Defaults to DetectorTrigger.internal. livetime Livetime / Exposure time with which to set up the detector. Defaults to None if not applicable or the detector is expected to use its previously-set exposure time. deadtime Defaults to None. This is the minimum deadtime between triggers. multiplier The number of triggers grouped into a single StreamDatum index.

abstract async arm() None[source]#

Arm the detector

abstract async wait_for_idle()[source]#

This will wait on the internal _arm_status and wait for it to get disarmed/idle

abstract async disarm()[source]#

Disarm the detector, return detector to an idle state

class ophyd_async.core.DetectorTrigger(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Type of mechanism for triggering a detector to take frames

internal = 'internal'#

Detector generates internal trigger for given rate

edge_trigger = 'edge_trigger'#

Expect a series of arbitrary length trigger signals

constant_gate = 'constant_gate'#

Expect a series of constant width external gate signals

variable_gate = 'variable_gate'#

Expect a series of variable width external gate signals

class ophyd_async.core.DetectorWriter[source]#

Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file)

abstract async open(multiplier: int = 1) dict[str, DataKey][source]#

Open writer and wait for it to be ready for data.

Args:
multiplier: Each StreamDatum index corresponds to this many

written exposures

Returns:

Output for describe()

abstract observe_indices_written(timeout=10.0) AsyncGenerator[int, None][source]#

Yield the index of each frame (or equivalent data point) as it is written

abstract async get_indices_written() int[source]#

Get the number of indices written

abstract collect_stream_docs(indices_written: int) AsyncIterator[Tuple[Literal['stream_resource'], StreamResource] | Tuple[Literal['stream_datum'], StreamDatum]][source]#

Create Stream docs up to given number written

abstract async close() None[source]#

Close writer, blocks until I/O is complete

class ophyd_async.core.StandardDetector(controller: DetectorControl, writer: DetectorWriter, config_sigs: Sequence[SignalR] = (), name: str = '')[source]#

Useful detector base class for step and fly scanning detectors. Aggregates controller and writer logic together.

prepare(value: TriggerInfo) None[source]#

Arm detector.

Prepare the detector with trigger information. This is determined at and passed in from the plan level.

This currently only prepares detectors for flyscans and stepscans just use the trigger information determined in trigger.

To do: Unify prepare to be use for both fly and step scans.

Args:

value: TriggerInfo describing how to trigger the detector

pydantic model ophyd_async.core.TriggerInfo[source]#

Minimal set of information required to setup triggering on a detector

Show JSON schema
{
   "title": "TriggerInfo",
   "description": "Minimal set of information required to setup triggering on a detector",
   "type": "object",
   "properties": {
      "number": {
         "minimum": 0,
         "title": "Number",
         "type": "integer"
      },
      "trigger": {
         "$ref": "#/$defs/DetectorTrigger",
         "default": "internal"
      },
      "deadtime": {
         "anyOf": [
            {
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Deadtime"
      },
      "livetime": {
         "anyOf": [
            {
               "minimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Livetime"
      },
      "frame_timeout": {
         "anyOf": [
            {
               "exclusiveMinimum": 0.0,
               "type": "number"
            },
            {
               "type": "null"
            }
         ],
         "default": null,
         "title": "Frame Timeout"
      },
      "multiplier": {
         "default": 1,
         "title": "Multiplier",
         "type": "integer"
      },
      "iteration": {
         "default": 1,
         "title": "Iteration",
         "type": "integer"
      }
   },
   "$defs": {
      "DetectorTrigger": {
         "description": "Type of mechanism for triggering a detector to take frames",
         "enum": [
            "internal",
            "edge_trigger",
            "constant_gate",
            "variable_gate"
         ],
         "title": "DetectorTrigger",
         "type": "string"
      }
   },
   "required": [
      "number"
   ]
}

Fields:
  • number (int)

  • trigger (ophyd_async.core._detector.DetectorTrigger)

  • deadtime (float | None)

  • livetime (float | None)

  • frame_timeout (float | None)

  • multiplier (int)

  • iteration (int)

field deadtime: float | None = None#

What is the minimum deadtime between triggers

Constraints:
  • ge = 0

field frame_timeout: float | None = None#

What is the maximum timeout on waiting for a frame

Constraints:
  • gt = 0

field iteration: int = 1#

The number of times the detector can go through a complete cycle of kickoff and complete without needing to re-arm. This is important for detectors where the process of arming is expensive in terms of time

field livetime: float | None = None#

What is the maximum high time of the triggers

Constraints:
  • ge = 0

field multiplier: int = 1#

How many triggers make up a single StreamDatum index, to allow multiple frames from a faster detector to be zipped with a single frame from a slow detector e.g. if num=10 and multiplier=5 then the detector will take 10 frames, but publish 2 indices, and describe() will show a shape of (5, h, w)

field number: int [Required]#

Number of triggers that will be sent, 0 means infinite

Constraints:
  • ge = 0

field trigger: DetectorTrigger = DetectorTrigger.internal#

Sort of triggers that will be sent

class ophyd_async.core.Device(name: str = '')[source]#

Common base class for all Ophyd Async Devices.

By default, names and connects all Device children.

parent: Device | None = None#

The parent Device if it exists

property name: str#

Return the name of the Device

set_name(name: str)[source]#

Set self.name=name and each self.child.name=name+"-child".

Parameters:

name – New name to set

async connect(mock: bool = False, timeout: float = 10.0, force_reconnect: bool = False)[source]#

Connect self and all child Devices.

Contains a timeout that gets propagated to child.connect methods.

Parameters:
  • mock – If True then use MockSignalBackend for all Signals

  • timeout – Time to wait before failing with a TimeoutError.

class ophyd_async.core.DeviceCollector(set_name=True, connect=True, mock=False, timeout: float = 10.0)[source]#

Collector of top level Device instances to be used as a context manager

Parameters:
  • set_name – If True, call device.set_name(variable_name) on all collected Devices

  • connect – If True, call device.connect(mock) in parallel on all collected Devices

  • mock – If True, connect Signals in simulation mode

  • timeout – How long to wait for connect before logging an exception

Notes

Example usage:

[async] with DeviceCollector():
    t1x = motor.Motor("BLxxI-MO-TABLE-01:X")
    t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y")
    # Names and connects devices here
assert t1x.comm.velocity.source
assert t1x.name == "t1x"
class ophyd_async.core.DeviceVector[source]#

Defines device components with indices.

In the below example, foos becomes a dictionary on the parent device at runtime, so parent.foos[2] returns a FooDevice. For example usage see DynamicSensorGroup

ophyd_async.core.all_at_once(values: dict[str, Any]) Sequence[dict[str, Any]][source]#

Sort all the values into a single phase so they are set all at once

ophyd_async.core.get_signal_values(signals: dict[str, SignalRW[Any]], ignore: list[str] | None = None) Generator[Msg, Sequence[Location[Any]], dict[str, Any]][source]#

Get signal values in bulk.

Used as part of saving the signals of a device to a yaml file.

Parameters:
  • signals (Dict[str, SignalRW]) – Dictionary with pv names and matching SignalRW values. Often the direct result of walk_rw_signals().

  • ignore (Optional[List[str]]) – Optional list of PVs that should be ignored.

Returns:

A dictionary containing pv names and their associated values. Ignored pvs are set to None.

Return type:

Dict[str, Any]

ophyd_async.core.load_device(device: Device, path: str)[source]#

Plan which loads PVs from a yaml file into a device.

Parameters:
  • device (Device) – The device to load PVs into

  • path (str) – Path of the yaml file to load

ophyd_async.core.load_from_yaml(save_path: str) Sequence[dict[str, Any]][source]#

Plan that returns a list of dicts with saved signal values from a yaml file.

Parameters:

save_path (str) – Path of the yaml file to load from

ophyd_async.core.save_device(device: ~ophyd_async.core._device.Device, path: str, sorter: ~collections.abc.Callable[[dict[str, ~typing.Any]], ~collections.abc.Sequence[dict[str, ~typing.Any]]] = <function all_at_once>, ignore: list[str] | None = None)[source]#

Plan that saves the state of all PV’s on a device using a sorter.

The default sorter assumes all saved PVs can be loaded at once, and therefore can be saved at one time, i.e. all PVs will appear on one list in the resulting yaml file.

This can be a problem, because when the yaml is ingested with ophyd_async.core.load_device(), it will set all of those PVs at once. However, some PV’s need to be set before others - this is device specific.

Therefore, users should consider the order of device loading and write their own sorter algorithms accordingly.

See ophyd_async.fastcs.panda.phase_sorter() for a valid implementation of the sorter.

Parameters:
  • device (Device) – The device whose PVs should be saved.

  • path (str) – The path where the resulting yaml should be saved to

  • sorter (Callable[[Dict[str, Any]], Sequence[Dict[str, Any]]])

  • ignore (Optional[List[str]])

ophyd_async.core.save_to_yaml(phases: Sequence[dict[str, Any]], save_path: str | Path) None[source]#

Plan which serialises a phase or set of phases of SignalRWs to a yaml file.

Parameters:
  • phases (dict or list of dicts) – The values to save. Each item in the list is a seperate phase used when loading a device. In general this variable be the return value of get_signal_values.

  • save_path (str) – Path of the yaml file to write to

ophyd_async.core.set_signal_values(signals: dict[str, SignalRW[Any]], values: Sequence[dict[str, Any]]) Generator[Msg, None, None][source]#

Maps signals from a yaml file into device signals.

values contains signal values in phases, which are loaded in sequentially into the provided signals, to ensure signals are set in the correct order.

Parameters:
  • signals (Dict[str, SignalRW[Any]]) – Dictionary of named signals to be updated if value found in values argument. Can be the output of walk_rw_signals() for a device.

  • values (Sequence[Dict[str, Any]]) – List of dictionaries of signal name and value pairs, if a signal matches the name of one in the signals argument, sets the signal to that value. The groups of signals are loaded in their list order. Can be the output of load_from_yaml() for a yaml file.

ophyd_async.core.walk_rw_signals(device: Device, path_prefix: str | None = '') dict[str, SignalRW[Any]][source]#

Retrieve all SignalRWs from a device.

Stores retrieved signals with their dotted attribute paths in a dictionary. Used as part of saving and loading a device.

Parameters:
  • device (Device) – Ophyd device to retrieve read-write signals from.

  • path_prefix (str) – For internal use, leave blank when calling the method.

Returns:

class ophyd_async.core.HDFDataset(data_key: str, dataset: str, shape: collections.abc.Sequence[int] = <factory>, dtype_numpy: str = '', multiplier: int = 1, swmr: bool = False, chunk_shape: tuple[int, ...] = ())[source]#
class ophyd_async.core.HDFFile(full_file_name: Path, datasets: list[HDFDataset], hostname: str = 'localhost')[source]#
Parameters:
  • full_file_name – Absolute path to the file to be written

  • datasets – Datasets to write into the file

ophyd_async.core.config_ophyd_async_logging(file=<colorama.ansitowin32.StreamWrapper object>, fmt='%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d] %(message)s', datefmt='%y%m%d %H:%M:%S', color=True, level='WARNING')[source]#

Set a new handler on the logging.getLogger('ophyd_async') logger. If this is called more than once, the handler from the previous invocation is removed (if still present) and replaced.

Parameters:
  • file (object with write method or filename string) – Default is sys.stdout.

  • fmt (Overall logging format)

  • datefmt (string) – Date format. Default is '%H:%M:%S'.

  • color (boolean) – Use ANSI color codes. True by default.

  • level (str or int) – Python logging level, given as string or corresponding integer. Default is ‘WARNING’.

Returns:

handler – The handler, which has already been added to the ‘ophyd_async’ logger.

Return type:

logging.Handler

Examples

Log to a file.

config_ophyd_async_logging(file=’/tmp/what_is_happening.txt’)

Include the date along with the time. (The log messages will always include microseconds, which are configured separately, not as part of ‘datefmt’.)

config_ophyd_async_logging(datefmt=”%Y-%m-%d %H:%M:%S”)

Turn off ANSI color codes.

config_ophyd_async_logging(color=False)

Increase verbosity: show level DEBUG or higher.

config_ophyd_async_logging(level=’DEBUG’)

class ophyd_async.core.MockSignalBackend(datatype: type[T] | None = None, initial_backend: SignalBackend[T] | None = None)[source]#

Signal backend for testing, created by Device.connect(mock=True).

async get_setpoint() T[source]#

For a soft signal, the setpoint and readback values are the same.

ophyd_async.core.callback_on_mock_put(signal: Signal[T], callback: Callable[[T], None] | Callable[[T], Awaitable[None]])[source]#

For setting a callback when a backend is put to.

Can either be used in a context, with the callback being unset on exit, or as an ordinary function.

Parameters:
  • signal – A signal with a MockSignalBackend backend.

  • callback – The callback to call when the backend is put to during the context.

ophyd_async.core.get_mock_put(signal: Signal) AsyncMock[source]#

Get the mock associated with the put call on the signal.

ophyd_async.core.set_mock_put_proceeds(signal: Signal, proceeds: bool)[source]#

Allow or block a put with wait=True from proceeding

ophyd_async.core.set_mock_value(signal: Signal[T], value: T)[source]#

Set the value of a signal that is in mock mode.

ophyd_async.core.set_mock_values(signal: Signal, values: Iterable[Any], require_all_consumed: bool = False) _SetValuesIterator[source]#

Iterator to set a signal to a sequence of values, optionally repeating the sequence.

Parameters:
  • signal – A signal with a MockSignalBackend backend.

  • values – An iterable of the values to set the signal to, on each iteration the value will be set.

  • require_all_consumed – If True, an AssertionError will be raised if the iterator is deleted before all values have been consumed.

Notes

Example usage:

for value_set in set_mock_values(signal, [1, 2, 3]):
     # do something

 cm = set_mock_values(signal, 1, 2, 3, require_all_consumed=True):
 next(cm)
 # do something
class ophyd_async.core.PathInfo(directory_path: Path, filename: str, create_dir_depth: int = 0)[source]#

Information about where and how to write a file.

Parameters:
  • directory_path – Directory into which files should be written

  • filename – Base filename to use generated by FilenameProvider, w/o extension

  • create_dir_depth – Optional depth of directories to create if they do not exist

class ophyd_async.core.StandardReadable(name: str = '')[source]#

Device that owns its children and provides useful default behavior.

  • When its name is set it renames child Devices

  • Signals can be registered for read() and read_configuration()

  • These signals will be subscribed for read() between stage() and unstage()

set_readable_signals(read: Sequence[SignalR] = (), config: Sequence[SignalR] = (), read_uncached: Sequence[SignalR] = ())[source]#
Parameters:
  • read – Signals to make up read()

  • conf – Signals to make up read_configuration()

  • read_uncached – Signals to make up read() that won’t be cached

add_children_as_readables(wrapper: Callable[[AsyncReadable | AsyncConfigurable | AsyncStageable | HasHints], AsyncReadable | AsyncConfigurable | AsyncStageable | HasHints] | type[ConfigSignal] | type[HintedSignal] | None = None) Generator[None, None, None][source]#

Context manager to wrap adding Devices

Add Devices to this class instance inside the Context Manager to automatically add them to the correct fields, based on the Device’s interfaces.

The provided wrapper class will be applied to all Devices and can be used to specify their behaviour.

Parameters:

wrapper – Wrapper class to apply to all Devices created inside the context manager.

See also

add_readables(), ConfigSignal, HintedSignal, HintedSignal.uncached()

add_readables(devices: Sequence[AsyncReadable | AsyncConfigurable | AsyncStageable | HasHints], wrapper: Callable[[AsyncReadable | AsyncConfigurable | AsyncStageable | HasHints], AsyncReadable | AsyncConfigurable | AsyncStageable | HasHints] | type[ConfigSignal] | type[HintedSignal] | None = None) None[source]#

Add the given devices to the lists of known Devices

Add the provided Devices to the relevant fields, based on the Signal’s interfaces.

The provided wrapper class will be applied to all Devices and can be used to specify their behaviour.

Parameters:
  • devices – The devices to be added

  • wrapper – Wrapper class to apply to all Devices created inside the context manager.

See also

add_children_as_readables(), ConfigSignal, HintedSignal, HintedSignal.uncached()

class ophyd_async.core.Signal(backend: ~ophyd_async.core._signal_backend.SignalBackend[~ophyd_async.core._utils.T] = <ophyd_async.core._signal.DisconnectedBackend object>, timeout: float | None = 10.0, name: str = '')[source]#

A Device with the concept of a value, with R, RW, W and X flavours

property source: str#

//PV_PREFIX:SIGNAL, or “” if not set

Type:

Like ca

class ophyd_async.core.SignalR(backend: ~ophyd_async.core._signal_backend.SignalBackend[~ophyd_async.core._utils.T] = <ophyd_async.core._signal.DisconnectedBackend object>, timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be read from and monitored

async read(cached: bool | None = None) dict[str, Reading][source]#

Return a single item dict with the reading in it

async describe() dict[str, DataKey][source]#

Return a single item dict with the descriptor in it

async get_value(cached: bool | None = None) T[source]#

The current value

subscribe_value(function: Callable[[T], None])[source]#

Subscribe to updates in value of a device

subscribe(function: Callable[[dict[str, Reading]], None]) None[source]#

Subscribe to updates in the reading

clear_sub(function: Callable[[T], None]) None[source]#

Remove a subscription.

stage() None[source]#

Start caching this signal

unstage() None[source]#

Stop caching this signal

class ophyd_async.core.SignalRW(backend: ~ophyd_async.core._signal_backend.SignalBackend[~ophyd_async.core._utils.T] = <ophyd_async.core._signal.DisconnectedBackend object>, timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be both read and set

class ophyd_async.core.SignalW(backend: ~ophyd_async.core._signal_backend.SignalBackend[~ophyd_async.core._utils.T] = <ophyd_async.core._signal.DisconnectedBackend object>, timeout: float | None = 10.0, name: str = '')[source]#

Signal that can be set

set(value: T, wait=True, timeout: float | None | Literal['CALCULATE_TIMEOUT'] = 'CALCULATE_TIMEOUT') AsyncStatus[source]#

Set the value and return a status saying when it’s done

class ophyd_async.core.SignalX(backend: ~ophyd_async.core._signal_backend.SignalBackend[~ophyd_async.core._utils.T] = <ophyd_async.core._signal.DisconnectedBackend object>, timeout: float | None = 10.0, name: str = '')[source]#

Signal that puts the default value

trigger(wait=True, timeout: float | None | Literal['CALCULATE_TIMEOUT'] = 'CALCULATE_TIMEOUT') AsyncStatus[source]#

Trigger the action and return a status saying when it’s done

async ophyd_async.core.assert_configuration(configurable: AsyncConfigurable, configuration: Mapping[str, Reading]) None[source]#

Assert readings from Configurable.

Parameters:
  • configurable – Configurable with Configurable.read function that generate readings.

  • configuration – The expected readings from configurable.

Notes

Example usage::

await assert_configuration(configurable configuration)

ophyd_async.core.assert_emitted(docs: Mapping[str, list[dict]], **numbers: int)[source]#

Assert emitted document generated by running a Bluesky plan

Parameters:
  • Doc – A dictionary

  • numbers – expected emission in kwarg from

Notes

Example usage::

assert_emitted(docs, start=1, descriptor=1, resource=1, datum=1, event=1, stop=1)

async ophyd_async.core.assert_reading(readable: AsyncReadable, expected_reading: Mapping[str, Reading]) None[source]#

Assert readings from readable.

Parameters:
  • readable – Callable with readable.read function that generate readings.

  • reading – The expected readings from the readable.

Notes

Example usage::

await assert_reading(readable, reading)

async ophyd_async.core.assert_value(signal: SignalR[T], value: Any) None[source]#

Assert a signal’s value and compare it an expected signal.

Parameters:
  • signal – signal with get_value.

  • value – The expected value from the signal.

Notes

Example usage::

await assert_value(signal, value)

async ophyd_async.core.observe_value(signal: SignalR[T], timeout: float | None = None, done_status: Status | None = None) AsyncGenerator[T, None][source]#

Subscribe to the value of a signal so it can be iterated from.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end

  • timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError

  • done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.

Notes

Example usage:

async for value in observe_value(sig):
    do_something_with(value)
async ophyd_async.core.set_and_wait_for_value(signal: SignalRW[T], value: T, timeout: float = 10.0, status_timeout: float | None = None) AsyncStatus[source]#

Set a signal and monitor it until it has that value.

Useful for busy record, or other Signals with pattern:
  • Set Signal with wait=True and stash the Status

  • Read the same Signal to check the operation has started

  • Return the Status so calling code can wait for operation to complete

Parameters:
  • signal – The signal to set

  • value – The value to set it to

  • timeout – How long to wait for the signal to have the value

  • status_timeout – How long the returned Status will wait for the set to complete

Notes

Example usage:

set_and_wait_for_value(device.acquire, 1)
async ophyd_async.core.set_and_wait_for_other_value(set_signal: SignalW[T], set_value: T, read_signal: SignalR[S], read_value: S, timeout: float = 10.0, set_timeout: float | None = None) AsyncStatus[source]#

Set a signal and monitor another signal until it has the specified value.

This function sets a set_signal to a specified set_value and waits for a read_signal to have the read_value.

Parameters:
  • signal – The signal to set

  • set_value – The value to set it to

  • read_signal – The signal to monitor

  • read_value – The value to wait for

  • timeout – How long to wait for the signal to have the value

  • set_timeout – How long to wait for the set to complete

Notes

Example usage:

set_and_wait_for_value(device.acquire, 1, device.acquire_rbv, 1)
ophyd_async.core.soft_signal_r_and_setter(datatype: type[T] | None = None, initial_value: T | None = None, name: str = '', units: str | None = None, precision: int | None = None) tuple[SignalR[T], Callable[[T], None]][source]#

Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device. May pass metadata, which are propagated into describe. Use soft_signal_rw if you want a device that is externally modifiable

ophyd_async.core.soft_signal_rw(datatype: type[T] | None = None, initial_value: T | None = None, name: str = '', units: str | None = None, precision: int | None = None) SignalRW[T][source]#

Creates a read-writable Signal with a SoftSignalBackend. May pass metadata, which are propagated into describe.

async ophyd_async.core.wait_for_value(signal: SignalR[T], match: T | Callable[[T], bool], timeout: float | None)[source]#

Wait for a signal to have a matching value.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end

  • match – If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match.

  • timeout – How long to wait for the value to match

Notes

Example usage:

wait_for_value(device.acquiring, 1, timeout=1)

Or:

wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
class ophyd_async.core.SignalBackend[source]#

A read/write/monitor backend for a Signals

datatype: type[T] | None = None#

Datatype of the signal value

abstract classmethod datatype_allowed(dtype: Any) bool[source]#

Check if a given datatype is acceptable for this signal backend.

abstract source(name: str) str[source]#

Return source of signal. Signals may pass a name to the backend, which can be used or discarded.

abstract async connect(timeout: float = 10.0)[source]#

Connect to underlying hardware

abstract async put(value: T | None, wait=True, timeout=None)[source]#

Put a value to the PV, if wait then wait for completion for up to timeout

abstract async get_datakey(source: str) DataKey[source]#

Metadata like source, dtype, shape, precision, units

abstract async get_reading() Reading[source]#

The current value, timestamp and severity

abstract async get_value() T[source]#

The current value

abstract async get_setpoint() T[source]#

The point that a signal was requested to move to.

abstract set_callback(callback: Callable[[Reading, T], None] | None) None[source]#

Observe changes to the current value, timestamp and severity

class ophyd_async.core.SoftSignalBackend(datatype: type[T] | None, initial_value: T | None = None, metadata: SignalMetadata = None)[source]#

An backend to a soft Signal, for test signals see MockSignalBackend.

async connect(timeout: float = 10.0) None[source]#

Connection isn’t required for soft signals.

set_value(value: T)[source]#

Method to bypass asynchronous logic.

async get_setpoint() T[source]#

For a soft signal, the setpoint and readback values are the same.

class ophyd_async.core.AsyncStatus(awaitable: Coroutine | Task)[source]#

Convert asyncio awaitable to bluesky Status interface

classmethod wrap(f: Callable[[P], Coroutine]) Callable[[P], AS][source]#

Wrap an async function in an AsyncStatus.

class ophyd_async.core.WatchableAsyncStatus(iterator: AsyncIterator[WatcherUpdate[T]])[source]#

Convert AsyncIterator of WatcherUpdates to bluesky Status interface.

classmethod wrap(f: Callable[[P], AsyncIterator[WatcherUpdate[T]]]) Callable[[P], WAS][source]#

Wrap an AsyncIterator in a WatchableAsyncStatus.

exception ophyd_async.core.NotConnected(errors: str | dict[str, Exception])[source]#

Exception to be raised if a Device.connect is cancelled

pydantic model ophyd_async.core.Table[source]#

An abstraction of a Table of str to numpy array.

Show JSON schema
{
   "title": "Table",
   "description": "An abstraction of a Table of str to numpy array.",
   "type": "object",
   "properties": {}
}

Validators:
  • validate_arrays » all fields

static row(cls: type[TableSubclass], **kwargs) TableSubclass[source]#
validator validate_arrays  »  all fields[source]#
class ophyd_async.core.WatcherUpdate(current: T, initial: T, target: T, name: str | None = None, unit: str | None = None, precision: float | None = None, fraction: float | None = None, time_elapsed: float | None = None, time_remaining: float | None = None)[source]#

A dataclass such that, when expanded, it provides the kwargs for a watcher

ophyd_async.core.get_dtype(typ: type) dtype | None[source]#

Get the runtime dtype from a numpy ndarray type annotation

>>> import numpy.typing as npt
>>> import numpy as np
>>> get_dtype(npt.NDArray[np.int8])
dtype('int8')
ophyd_async.core.get_unique(values: dict[str, T], types: str) T[source]#

If all values are the same, return that value, otherwise raise TypeError

>>> get_unique({"a": 1, "b": 1}, "integers")
1
>>> get_unique({"a": 1, "b": 2}, "integers")
Traceback (most recent call last):
 ...
TypeError: Differing integers: a has 1, b has 2
ophyd_async.core.in_micros(t: float) int[source]#

Converts between a positive number of seconds and an equivalent number of microseconds.

Args:

t (float): A time in seconds

Raises:

ValueError: if t < 0

Returns:

t (int): A time in microseconds, rounded up to the nearest whole microsecond,

async ophyd_async.core.wait_for_connection(**coros: Awaitable[None])[source]#

Call many underlying signals, accumulating exceptions and returning them

Expected kwargs should be a mapping of names to coroutine tasks to execute.