ophyd_async.core#

The building blocks for making devices.

Package Contents#

Classes#

Device

Common base class for all Ophyd Async Devices.

DeviceConnector

Defines how a Device should be connected and type hints processed.

DeviceFiller

For filling signals on introspected devices.

DeviceVector

Defines a dictionary of Device children with arbitrary integer keys.

AsyncReadable

Async implementations of the sync bluesky.protocols.Readable.

AsyncConfigurable

Async implementation of the sync bluesky.protocols.Configurable.

AsyncStageable

Async implementation of the sync bluesky.protocols.Stageable.

Watcher

Protocol for watching changes in values.

AsyncStatus

Convert an asyncio awaitable to bluesky Status interface.

WatchableAsyncStatus

Convert an asyncio async iterable to bluesky Status and Watcher interface.

WatcherUpdate

A dataclass such that, when expanded, it provides the kwargs for a watcher.

Signal

A Device with the concept of a value, with R, RW, W and X flavours.

SignalR

Signal that can be read from and monitored.

SignalW

Signal that can be set.

SignalRW

Signal that can be both read and set.

SignalX

Signal that puts the default value.

SignalBackend

A read/write/monitor backend for a Signals.

SignalConnector

Used for connecting signals with a given backend.

StrictEnum

All members should exist in the Backend, and there will be no extras.

SubsetEnum

All members should exist in the Backend, but there may be extras.

Table

An abstraction of a Table where each field is a column.

SignalMetadata

Metadata for a signal. No field is required.

SoftSignalBackend

An backend to a soft Signal, for test signals see MockSignalBackend.

LazyMock

A lazily created Mock to be used when connecting in mock mode.

MockSignalBackend

Signal backend for testing, created by Device.connect(mock=True).

StandardReadable

Device that provides selected child Device values in read().

StandardReadableFormat

Declare how a Device should contribute to the StandardReadable verbs.

StandardDetector

Detector base class for step and fly scanning detectors.

TriggerInfo

Minimal set of information required to setup triggering on a detector.

DetectorTrigger

Type of mechanism for triggering a detector to take frames.

DetectorController

Detector logic for arming and disarming the detector.

DetectorWriter

Logic for making detector write data to somewhere persistent (e.g. HDF5 file).

PathInfo

Information about where and how to write a file.

PathProvider

Abstract class that tells a detector where to write its data.

StaticPathProvider

All files will be within a static directory.

AutoIncrementingPathProvider

Provides a new numerically incremented path on each call.

YMDPathProvider

Provides a path with the date included in the directory name.

FilenameProvider

Base class for callable classes providing filenames.

StaticFilenameProvider

Provides a constant filename on every call.

AutoIncrementFilenameProvider

Provides a new numerically incremented filename on each call.

UUIDFilenameProvider

Files will have a UUID as a filename.

NameProvider

Base class for callable classes providing data keys.

DatasetDescriber

For describing datasets in file writing.

HDFDataset

TODO.

HDFFile

TODO.

StandardFlyer

Base class for ‘flyable’ devices.

FlyerController

Base class for controlling ‘flyable’ devices.

Settings

Used for supplying settings to signals.

SettingsProvider

Base class for providing settings.

YamlSettingsProvider

For providing settings from yaml to signals.

Reference

Hide an object behind a reference.

Functions#

init_devices

Auto initialize top level Device instances: to be used as a context manager.

completed_status

Return a completed AsyncStatus.

soft_signal_r_and_setter

Create a read-only Signal with a SoftSignalBackend.

soft_signal_rw

Create a read-writable Signal with a SoftSignalBackend.

observe_value

Subscribe to the value of a signal so it can be iterated from.

observe_signals_value

Subscribe to a set of signals so they can be iterated from.

wait_for_value

Wait for a signal to have a matching value.

set_and_wait_for_value

Set a signal and monitor that same signal until it has the specified value.

set_and_wait_for_other_value

Set a signal and monitor another signal until it has the specified value.

walk_rw_signals

Retrieve all SignalRWs from a device.

config_ophyd_async_logging

Set a new handler on the logging.getLogger('ophyd_async') logger.

gather_dict

Take named coros and return a dict of their name to their return value.

get_dtype

Get the runtime dtype from a numpy ndarray type annotation.

get_enum_cls

Get the enum class from a datatype.

get_unique

If all values are the same, return that value, otherwise raise TypeError.

in_micros

Convert between a seconds and microseconds.

make_datakey

Make a DataKey for a given datatype.

wait_for_connection

Call many underlying signals, accumulating exceptions and returning them.

Data#

SignalDatatype

The supported Signal datatypes:

SignalDatatypeT

A typevar for a SignalDatatype.

DTypeScalar_co

A numpy dtype like numpy.float64.

Array1D

A type alias for a 1D numpy array with a specific scalar data type.

CALCULATE_TIMEOUT

Sentinel used to implement myfunc(timeout=CalculateTimeout)

CalculatableTimeout

DEFAULT_TIMEOUT

Callback

ConfigSignal

HintedSignal

API#

class ophyd_async.core.Device(name: str = '', connector: DeviceConnector | None = None)[source]#

Bases: bluesky.protocols.HasName

Common base class for all Ophyd Async Devices.

Parameters:
  • name – Optional name of the Device

  • connector – Optional DeviceConnector instance to use at connect()

parent: Device | None#

None

The parent Device if it exists

property name: str#

Return the name of the Device.

children() Iterator[tuple[str, Device]][source]#

For each attribute that is a Device, yield the name and Device.

Yields:

(attr_name, attr) for each child attribute that is a Device.

log() LoggerAdapter#

Return a logger configured with the device name.

set_name(name: str, *, child_name_separator: str | None = None) None[source]#

Set self.name=name and each self.child.name=name+"-child".

Parameters:
  • name – New name to set.

  • child_name_separator – Use this as a separator instead of “-”. Use “_” instead to make the same names as the equivalent ophyd sync device.

async connect(mock: bool | LazyMock = False, timeout: float = DEFAULT_TIMEOUT, force_reconnect: bool = False) None[source]#

Connect the device and all child devices.

Successful connects will be cached so subsequent calls will return immediately. Contains a timeout that gets propagated to child.connect methods.

Parameters:
  • mock – If True then use MockSignalBackend for all Signals. If passed a LazyMock then pass this down for use within the Signals, otherwise create one.

  • timeout – Time to wait before failing with a TimeoutError.

  • force_reconnect – If True, force a reconnect even if the last connect succeeded.

class ophyd_async.core.DeviceConnector[source]#

Defines how a Device should be connected and type hints processed.

create_children_from_annotations(device: Device)[source]#

Use when children can be created from introspecting the hardware.

Some control systems allow introspection of a device to determine what children it has. To allow this to work nicely with typing we add these hints to the Device like so::

my_signal: SignalRW[int]
my_device: MyDevice

This method will be run during Device.__init__, and is responsible for turning all of those type hints into real Signal and Device instances.

Subsequent runs of this function should do nothing, to allow it to be called early in Devices that need to pass references to their children during __init__.

async connect_mock(device: Device, mock: LazyMock)[source]#

Use during Device.connect with mock=True.

This is called when there is no cached connect done in mock=True mode. It connects the Device and all its children in mock mode.

async connect_real(device: Device, timeout: float, force_reconnect: bool)[source]#

Use during Device.connect with mock=False.

This is called when there is no cached connect done in mock=False mode. It connects the Device and all its children in real mode in parallel.

class ophyd_async.core.DeviceFiller(device: Device, signal_backend_factory: Callable[[type[SignalDatatype] | None], SignalBackendT], device_connector_factory: Callable[[], DeviceConnectorT])[source]#

Bases: typing.Generic[ophyd_async.core._device_filler.SignalBackendT, ophyd_async.core._device_filler.DeviceConnectorT]

For filling signals on introspected devices.

Parameters:
  • device – The device to fill.

  • signal_backend_factory – A callable that returns a SignalBackend.

  • device_connector_factory – A callable that returns a DeviceConnector.

check_created()[source]#

Check that all Signals and Devices declared in annotations are created.

create_signals_from_annotations(filled=True) Iterator[tuple[SignalBackendT, list[Any]]][source]#

Create all Signals from annotations.

Parameters:

filled – If True then the Signals created should be considered already filled with connection data. If False then fill_child_signal needs calling at device connection time before the signal can be connected.

Yields:

(backend, extras) The SignalBackend that has been created for this Signal, and the list of extra annotations that could be used to customize it. For example an EpicsDeviceConnector consumes PvSuffix extras to set the write_pv of the backend. Any unhandled extras should be left on the list so this class can handle them, e.g. StandardReadableFormat instances.

create_devices_from_annotations(filled=True) Iterator[tuple[DeviceConnectorT, list[Any]]][source]#

Create all Signals from annotations.

Parameters:

filled – If True then the Devices created should be considered already filled with connection data. If False then fill_child_device needs calling at parent device connection time before the child Device can be connected.

Yields:

(connector, extras) The DeviceConnector that has been created for this Signal, and the list of extra annotations that could be used to customize it.

create_device_vector_entries_to_mock(num: int)[source]#

Create num entries for each DeviceVector.

This is used when the Device is being connected in mock mode.

check_filled(source: str)[source]#

Check that all the created Signals and Devices are filled.

Parameters:

source – The source of the data that should have done the filling, for reporting as an error message

fill_child_signal(name: str, signal_type: type[Signal], vector_index: int | None = None) SignalBackendT[source]#

Mark a Signal as filled, and return its backend for filling.

Parameters:
  • name – The name without trailing underscore, the name in the control system

  • signal_type – One of the types SignalR, SignalW, SignalRW or SignalX

  • vector_index – If the child is in a DeviceVector then what index is it

Returns:

The SignalBackend for the filled Signal.

fill_child_device(name: str, device_type: type[Device] = Device, vector_index: int | None = None) DeviceConnectorT[source]#

Mark a Device as filled, and return its connector for filling.

Parameters:
  • name – The name without trailing underscore, the name in the control system

  • device_type – The Device subclass to be created

  • vector_index – If the child is in a DeviceVector then what index is it

Returns:

The DeviceConnector for the filled Device.

class ophyd_async.core.DeviceVector(children: Mapping[int, DeviceT], name: str = '')[source]#

Bases: collections.abc.MutableMapping[int, ophyd_async.core._device.DeviceT], ophyd_async.core._device.Device

Defines a dictionary of Device children with arbitrary integer keys.

See-also:

Implementing Devices for examples of how to use this class.

children() Iterator[tuple[str, Device]][source]#

For each attribute that is a Device, yield the name and Device.

Yields:

(attr_name, attr) for each child attribute that is a Device.

ophyd_async.core.init_devices(set_name=True, child_name_separator: str = '-', connect=True, mock=False, timeout: float = 10.0)[source]#

Auto initialize top level Device instances: to be used as a context manager.

Parameters:
  • set_name – If True, call device.set_name(variable_name) on all Devices created within the context manager that have an empty name.

  • child_name_separator – Separator for child names if set_name is True.

  • connect – If True, call device.connect(mock, timeout) in parallel on all Devices created within the context manager.

  • mock – If True, connect Signals in mock mode.

  • timeout – How long to wait for connect before logging an exception.

Raises:

For example, to connect and name 2 motors in parallel:

[async] with init_devices():
    t1x = motor.Motor("BLxxI-MO-TABLE-01:X")
    t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y")
    # Names and connects devices here
assert t1x.name == "t1x"
class ophyd_async.core.AsyncReadable[source]#

Bases: bluesky.protocols.HasName, typing.Protocol

Async implementations of the sync bluesky.protocols.Readable.

abstractmethod async read() dict[str, Reading][source]#

Return value, timestamp, optional per-point metadata for each field name.

For example:

{
    "channel1": {"value": 5, "timestamp": 1472493713.271991},
    "channel2": {"value": 16, "timestamp": 1472493713.539238},
}
abstractmethod async describe() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read().

For example:

{
    "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []},
    "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []},
}
class ophyd_async.core.AsyncConfigurable[source]#

Bases: bluesky.protocols.HasName, typing.Protocol

Async implementation of the sync bluesky.protocols.Configurable.

abstractmethod async read_configuration() dict[str, Reading][source]#

Return value, timestamp, optional per-point metadata for each field name.

Same API as AsyncReadable.read but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run.

abstractmethod async describe_configuration() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read_configuration().

class ophyd_async.core.AsyncStageable[source]#

Bases: typing.Protocol

Async implementation of the sync bluesky.protocols.Stageable.

abstractmethod stage() AsyncStatus[source]#

Set up the device for acquisition.

Returns:

An AsyncStatus that is marked done when the device is done staging.

abstractmethod unstage() AsyncStatus[source]#

Clean up the device after acquisition.

Returns:

An AsyncStatus that is marked done when the device is done unstaging.

class ophyd_async.core.Watcher[source]#

Bases: typing.Protocol, typing.Generic[ophyd_async.core._protocol.C]

Protocol for watching changes in values.

class ophyd_async.core.AsyncStatus(awaitable: Coroutine | Task, name: str | None = None)[source]#

Bases: ophyd_async.core._status.AsyncStatusBase

Convert an asyncio awaitable to bluesky Status interface.

Parameters:
  • awaitable – The coroutine or task to await.

  • name – The name of the device, if available.

For example:

status = AsyncStatus(asyncio.sleep(1))
assert not status.done
await status # waits for 1 second
assert status.done
classmethod wrap(f: Callable[P, Coroutine]) Callable[P, AsyncStatus][source]#

Wrap an async function in an AsyncStatus and return it.

Used to make an async function conform to a bluesky protocol.

For example:

class MyDevice(Device):
    @AsyncStatus.wrap
    async def trigger(self):
        await asyncio.sleep(1)
class ophyd_async.core.WatchableAsyncStatus(iterator: AsyncIterator[WatcherUpdate[T]], name: str | None = None)[source]#

Bases: ophyd_async.core._status.AsyncStatusBase, typing.Generic[ophyd_async.core._utils.T]

Convert an asyncio async iterable to bluesky Status and Watcher interface.

Parameters:
  • iterator – The async iterable to await.

  • name – The name of the device, if available.

watch(watcher: Watcher)[source]#

Add a watcher to the status.

It is called:

  • immediately if there has already been an update

  • on every subsequent update

classmethod wrap(f: Callable[P, AsyncIterator[WatcherUpdate[T]]]) Callable[P, WatchableAsyncStatus[T]][source]#

Wrap an AsyncIterator in a WatchableAsyncStatus.

For example:

class MyDevice(Device):
    @WatchableAsyncStatus.wrap
    async def trigger(self):
        # sleep for a second, updating on progress every 0.1 seconds
        for i in range(10):
            yield WatcherUpdate(initial=0, current=i*0.1, target=1)
            await asyncio.sleep(0.1)
class ophyd_async.core.WatcherUpdate[source]#

Bases: typing.Generic[ophyd_async.core._utils.T]

A dataclass such that, when expanded, it provides the kwargs for a watcher.

current: T#

None

The current value, where it currently is.

initial: T#

None

The initial value, where it was when it started.

target: T#

None

The target value, where it will be when it finishes.

name: str | None#

None

An optional name for the device, if available.

unit: str | None#

None

Units of the value, if applicable.

precision: float | None#

None

How many decimal places the value should be displayed to.

fraction: float | None#

None

The fraction of the way between initial and target.

time_elapsed: float | None#

None

The time elapsed since the start of the operation.

time_remaining: float | None#

None

The time remaining until the operation completes.

async ophyd_async.core.completed_status(exception: Exception | None = None)[source]#

Return a completed AsyncStatus.

Parameters:

exception – If given, then raise this exception when awaited.

class ophyd_async.core.Signal(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#

Bases: ophyd_async.core._device.Device, typing.Generic[ophyd_async.core._signal_backend.SignalDatatypeT]

A Device with the concept of a value, with R, RW, W and X flavours.

Parameters:
  • backend – The backend for providing Signal values.

  • timeout – The default timeout for operations on the Signal.

  • name – The name of the signal.

property source: str#

Returns the source of the signal.

E.g. “ca://PV_PREFIX:SIGNAL”, or “” if not available until connection.

class ophyd_async.core.SignalR(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#

Bases: ophyd_async.core._signal.Signal[ophyd_async.core._signal_backend.SignalDatatypeT], ophyd_async.core._protocol.AsyncReadable, ophyd_async.core._protocol.AsyncStageable, bluesky.protocols.Subscribable

Signal that can be read from and monitored.

async read(cached: bool | None = None) dict[str, Reading][source]#

Return a single item dict with the reading in it.

Parameters:

cached

Whether to use the cached monitored value:

  • If None, use the cache if it exists.

  • If False, do an explicit get.

  • If True, explicitly use the cache and raise an error if it doesn’t exist.

async describe() dict[str, DataKey][source]#

Return a single item dict describing the signal value.

async get_value(cached: bool | None = None) SignalDatatypeT[source]#

Return the current value.

Parameters:

cached

Whether to use the cached monitored value:

  • If None, use the cache if it exists.

  • If False, do an explicit get.

  • If True, explicitly use the cache and raise an error if it doesn’t exist.

subscribe_value(function: Callback[SignalDatatypeT])[source]#

Subscribe to updates in value of a device.

Parameters:

function – The callback function to call when the value changes.

subscribe(function: Callback[dict[str, Reading]]) None[source]#

Subscribe to updates in the reading.

Parameters:

function – The callback function to call when the reading changes.

clear_sub(function: Callback) None[source]#

Remove a subscription passed to subscribe or subscribe_value.

Parameters:

function – The callback function to remove.

async stage() None[source]#

Start caching this signal.

async unstage() None[source]#

Stop caching this signal.

class ophyd_async.core.SignalW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#

Bases: ophyd_async.core._signal.Signal[ophyd_async.core._signal_backend.SignalDatatypeT], bluesky.protocols.Movable

Signal that can be set.

async set(value: SignalDatatypeT, wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT) None[source]#

Set the value and return a status saying when it’s done.

Parameters:
  • value – The value to set.

  • wait – If True, wait for the set to complete.

  • timeout – The timeout for the set.

class ophyd_async.core.SignalRW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#

Bases: ophyd_async.core._signal.SignalR[ophyd_async.core._signal_backend.SignalDatatypeT], ophyd_async.core._signal.SignalW[ophyd_async.core._signal_backend.SignalDatatypeT], bluesky.protocols.Locatable

Signal that can be both read and set.

async locate() Location[source]#

Return the setpoint and readback.

class ophyd_async.core.SignalX(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#

Bases: ophyd_async.core._signal.Signal

Signal that puts the default value.

async trigger(wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT) None[source]#

Trigger the action and return a status saying when it’s done.

Parameters:
  • wait – If True, wait for the trigger to complete.

  • timeout – The timeout for the trigger.

class ophyd_async.core.SignalBackend(datatype: type[SignalDatatypeT] | None)[source]#

Bases: typing.Generic[ophyd_async.core._signal_backend.SignalDatatypeT]

A read/write/monitor backend for a Signals.

abstractmethod source(name: str, read: bool) str[source]#

Return source of signal.

Parameters:
  • name – The name of the signal, which can be used or discarded.

  • read – True if we want the source for reading, False if writing.

abstractmethod async connect(timeout: float)[source]#

Connect to underlying hardware.

abstractmethod async put(value: SignalDatatypeT | None, wait: bool)[source]#

Put a value to the PV, if wait then wait for completion.

abstractmethod async get_datakey(source: str) DataKey[source]#

Metadata like source, dtype, shape, precision, units.

abstractmethod async get_reading() Reading[SignalDatatypeT][source]#

Return the current value, timestamp and severity.

abstractmethod async get_value() SignalDatatypeT[source]#

Return the current value.

abstractmethod async get_setpoint() SignalDatatypeT[source]#

Return the point that a signal was requested to move to.

abstractmethod set_callback(callback: Callback[Reading[SignalDatatypeT]] | None) None[source]#

Observe changes to the current value, timestamp and severity.

class ophyd_async.core.SignalConnector(backend: SignalBackend)[source]#

Bases: ophyd_async.core._device.DeviceConnector

Used for connecting signals with a given backend.

async connect_mock(device: Device, mock: LazyMock)[source]#

Use during Device.connect with mock=True.

This is called when there is no cached connect done in mock=True mode. It connects the Device and all its children in mock mode.

async connect_real(device: Device, timeout: float, force_reconnect: bool)[source]#

Use during Device.connect with mock=False.

This is called when there is no cached connect done in mock=False mode. It connects the Device and all its children in real mode in parallel.

ophyd_async.core.SignalDatatype#

None

The supported Signal datatypes:

ophyd_async.core.SignalDatatypeT#

‘TypeVar(…)’

A typevar for a SignalDatatype.

ophyd_async.core.DTypeScalar_co#

‘TypeVar(…)’

A numpy dtype like numpy.float64.

ophyd_async.core.Array1D#

None

A type alias for a 1D numpy array with a specific scalar data type.

E.g. Array1D[np.float64] is a 1D numpy array of 64-bit floats.

class ophyd_async.core.StrictEnum[source]#

Bases: str, enum.Enum

All members should exist in the Backend, and there will be no extras.

class ophyd_async.core.SubsetEnum[source]#

Bases: ophyd_async.core._utils.StrictEnum

All members should exist in the Backend, but there may be extras.

class ophyd_async.core.Table(**kwargs)[source]#

Bases: pydantic.BaseModel

An abstraction of a Table where each field is a column.

For example:

>>> from ophyd_async.core import Table, Array1D
>>> import numpy as np
>>> from collections.abc import Sequence
>>> class MyTable(Table):
...     a: Array1D[np.int8]
...     b: Sequence[str]
...
>>> t = MyTable(a=[1, 2], b=["x", "y"])
>>> len(t)  # the length is the number of rows
2
>>> t2 = t + t  # adding tables together concatenates them
>>> t2.a
array([1, 2, 1, 2], dtype=int8)
>>> t2.b
['x', 'y', 'x', 'y']
>>> t2[1]  # slice a row
array([(2, b'y')], dtype=[('a', 'i1'), ('b', 'S40')])

model_config#

‘ConfigDict(…)’

numpy_dtype() dtype[source]#

Return a numpy dtype for a single row.

numpy_table(selection: slice | None = None) ndarray[source]#

Return a numpy array of the whole table.

class ophyd_async.core.SignalMetadata[source]#

Bases: typing.TypedDict

Metadata for a signal. No field is required.

limits: Limits#

None

The control, display, warning and alarm limits for a numeric datatype.

choices: list[str]#

None

The choice of possible values for an enum datatype.

precision: int#

None

The number of digits after the decimal place to display for a float datatype.

units: str#

None

The engineering units of the value for a numeric datatype.

class ophyd_async.core.SoftSignalBackend(datatype: type[SignalDatatypeT] | None, initial_value: SignalDatatypeT | None = None, units: str | None = None, precision: int | None = None)[source]#

Bases: ophyd_async.core._signal_backend.SignalBackend[ophyd_async.core._signal_backend.SignalDatatypeT]

An backend to a soft Signal, for test signals see MockSignalBackend.

Parameters:
  • datatype – The datatype of the signal, defaults to float if not given.

  • initial_value – The initial value of the signal, defaults to the “empty”, “zero” or “default” value of the datatype if not given.

  • units – The units for numeric datatypes.

  • precision – The number of digits after the decimal place to display for a float datatype.

set_value(value: SignalDatatypeT)[source]#

Set the current value, alarm and timestamp.

source(name: str, read: bool) str[source]#

Return source of signal.

Parameters:
  • name – The name of the signal, which can be used or discarded.

  • read – True if we want the source for reading, False if writing.

async connect(timeout: float)[source]#

Connect to underlying hardware.

async put(value: SignalDatatypeT | None, wait: bool) None[source]#

Put a value to the PV, if wait then wait for completion.

async get_datakey(source: str) DataKey[source]#

Metadata like source, dtype, shape, precision, units.

async get_reading() Reading[SignalDatatypeT][source]#

Return the current value, timestamp and severity.

async get_value() SignalDatatypeT[source]#

Return the current value.

async get_setpoint() SignalDatatypeT[source]#

Return the point that a signal was requested to move to.

set_callback(callback: Callback[Reading[SignalDatatypeT]] | None) None[source]#

Observe changes to the current value, timestamp and severity.

ophyd_async.core.soft_signal_r_and_setter(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]][source]#

Create a read-only Signal with a SoftSignalBackend.

May pass metadata, which are propagated into describe. Use soft_signal_rw if you want a device that is externally modifiable.

Parameters:
  • datatype – The datatype of the signal.

  • initial_value – The initial value of the signal.

  • name – The name of the signal.

  • units – The units of the signal.

  • precision – The precision of the signal.

Returns:

A tuple of the created SignalR and a callable to set its value.

ophyd_async.core.soft_signal_rw(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) SignalRW[SignalDatatypeT][source]#

Create a read-writable Signal with a SoftSignalBackend.

May pass metadata, which are propagated into describe.

Parameters:
  • datatype – The datatype of the signal.

  • initial_value – The initial value of the signal.

  • name – The name of the signal.

  • units – The units of the signal.

  • precision – The precision of the signal.

class ophyd_async.core.LazyMock(name: str = '', parent: LazyMock | None = None)[source]#

A lazily created Mock to be used when connecting in mock mode.

Creating Mocks is reasonably expensive when each Device (and Signal) requires its own, and the tree is only used when Signal.set() is called. This class allows a tree of lazily connected Mocks to be constructed so that when the leaf is created, so are its parents. Any calls to the child are then accessible from the parent mock.

>>> parent = LazyMock()
>>> child = parent.child("child")
>>> child_mock = child()
>>> child_mock()
<Mock name='mock.child()' id='...'>
>>> parent_mock = parent()
>>> parent_mock.mock_calls
[call.child()]

child(name: str) LazyMock[source]#

Return a child of this LazyMock with the given name.

class ophyd_async.core.MockSignalBackend(initial_backend: SignalBackend[SignalDatatypeT], mock: LazyMock)[source]#

Bases: ophyd_async.core._signal_backend.SignalBackend[ophyd_async.core._signal_backend.SignalDatatypeT]

Signal backend for testing, created by Device.connect(mock=True).

put_mock() AsyncMock#

Return the mock that will track calls to put().

set_value(value: SignalDatatypeT)[source]#

Set the value of the signal.

source(name: str, read: bool) str[source]#

Return source of signal.

Parameters:
  • name – The name of the signal, which can be used or discarded.

  • read – True if we want the source for reading, False if writing.

async connect(timeout: float) None[source]#

Connect to underlying hardware.

put_proceeds() Event#

Return an Event that will block put() until set.

The Event is initially set, but can be unset to block put().

async put(value: SignalDatatypeT | None, wait: bool)[source]#

Put a value to the PV, if wait then wait for completion.

async get_reading() Reading[source]#

Return the current value, timestamp and severity.

async get_value() SignalDatatypeT[source]#

Return the current value.

async get_setpoint() SignalDatatypeT[source]#

Return the point that a signal was requested to move to.

async get_datakey(source: str) DataKey[source]#

Metadata like source, dtype, shape, precision, units.

set_callback(callback: Callback[Reading[SignalDatatypeT]] | None) None[source]#

Observe changes to the current value, timestamp and severity.

async ophyd_async.core.observe_value(signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[SignalDatatypeT, None][source]#

Subscribe to the value of a signal so it can be iterated from.

The first value yielded in the iterator will be the current value of the Signal, and subsequent updates from the control system will result in that value being yielded, even if it is the same as the previous value.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end.

  • timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError.

  • done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.

  • done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an ‘asyncio.wait_for’ timeout.

Due to a rare condition with busy signals, it is not recommended to use this function with asyncio.timeout, including in an asyncio.wait_for loop. Instead, this timeout should be given to the done_timeout parameter.

Example:

async for value in observe_value(sig):
    do_something_with(value)
async ophyd_async.core.observe_signals_value(*signals: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None][source]#

Subscribe to a set of signals so they can be iterated from.

The first values yielded in the iterator will be the current values of the Signals, and subsequent updates from the control system will result in that value being yielded, even if it is the same as the previous value.

Parameters:
  • signals – Call subscribe_value on all the signals at the start, and clear_sub on it at the end.

  • timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError.

  • done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.

  • done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an asyncio.wait_for timeout.

Example:

async for signal, value in observe_signals_values(sig1, sig2, ..):
    if signal is sig1:
        do_something_with(value)
    elif signal is sig2:
        do_something_else_with(value)
async ophyd_async.core.wait_for_value(signal: SignalR[SignalDatatypeT], match: SignalDatatypeT | Callable[[SignalDatatypeT], bool], timeout: float | None) None[source]#

Wait for a signal to have a matching value.

Parameters:
  • signal – Call subscribe_value on this at the start, and clear_sub on it at the end.

  • match – If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match.

  • timeout – How long to wait for the value to match.

Example:

await wait_for_value(device.acquiring, 1, timeout=1)
# or
await wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
async ophyd_async.core.set_and_wait_for_value(signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, match_value: SignalDatatypeT | Callable[[SignalDatatypeT], bool] | None = None, timeout: float = DEFAULT_TIMEOUT, set_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus[source]#

Set a signal and monitor that same signal until it has the specified value.

This function sets a set_signal to a specified set_value and waits for a match_signal to have the match_value.

Parameters:
  • signal – The signal to set.

  • value – The value to set it to.

  • match_value – The value (or callable that says if the value matches) to wait for.

  • timeout – How long to wait for the signal to have the value.

  • set_timeout – How long to wait for the set to complete.

  • wait_for_set_completion – If False then return as soon as the match_signal matches match_value. If True then also wait for the set operation to complete before returning.

Seealso:

How to interact with signals while implementing bluesky verbs

Examples:

To set a parameter and wait for it’s value to change:

await set_and_wait_for_value(device.parameter, 1)

For busy record, or other Signals with pattern:

  • Set Signal with wait=True and stash the Status

  • Read the same Signal to check the operation has started

  • Return the Status so calling code can wait for operation to complete

status = await set_and_wait_for_value(
    device.acquire, 1, wait_for_set_completion=False
)
# device is now acquiring
await status
# device has finished acquiring
async ophyd_async.core.set_and_wait_for_other_value(set_signal: SignalW[SignalDatatypeT], set_value: SignalDatatypeT, match_signal: SignalR[SignalDatatypeV], match_value: SignalDatatypeV | Callable[[SignalDatatypeV], bool], timeout: float = DEFAULT_TIMEOUT, set_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus[source]#

Set a signal and monitor another signal until it has the specified value.

This function sets a set_signal to a specified set_value and waits for a match_signal to have the match_value.

Parameters:
  • set_signal – The signal to set.

  • set_value – The value to set it to.

  • match_signal – The signal to monitor.

  • match_value – The value (or callable that says if the value matches) to wait for.

  • timeout – How long to wait for the signal to have the value.

  • set_timeout – How long to wait for the set to complete.

  • wait_for_set_completion – If False then return as soon as the match_signal matches match_value. If True then also wait for the set operation to complete before returning.

Seealso:

How to interact with signals while implementing bluesky verbs

Example:

To set the setpoint and wait for the readback to match:

await set_and_wait_for_value(device.setpoint, 1, device.readback, 1)
ophyd_async.core.walk_rw_signals(device: Device, path_prefix: str = '') dict[str, SignalRW[Any]][source]#

Retrieve all SignalRWs from a device.

Stores retrieved signals with their dotted attribute paths in a dictionary. Used as part of saving and loading a device.

Parameters:
  • device – Device to retrieve read-write signals from.

  • path_prefix – For internal use, leave blank when calling the method.

Returns:

A dictionary matching the string attribute path of a SignalRW with the signal itself.

class ophyd_async.core.StandardReadable(name: str = '', connector: DeviceConnector | None = None)[source]#

Bases: ophyd_async.core._device.Device, ophyd_async.core._protocol.AsyncReadable, ophyd_async.core._protocol.AsyncConfigurable, ophyd_async.core._protocol.AsyncStageable, bluesky.protocols.HasHints

Device that provides selected child Device values in read().

Provides the ability for children to be registered to:

  • Participate in stage() and unstage()

  • Provide their value in read() and `describe()

  • Provide their value in read_configuration() and `describe_configuration()

  • Select a value to appear in hints

The behavior is customized with a StandardReadableFormat

async stage() None[source]#
async unstage() None[source]#
async describe_configuration() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read_configuration().

async read_configuration() dict[str, Reading][source]#

Return value, timestamp, optional per-point metadata for each field name.

Same API as AsyncReadable.read but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run.

async describe() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read().

For example:

{
    "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []},
    "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []},
}
async read() dict[str, Reading][source]#

Return value, timestamp, optional per-point metadata for each field name.

For example:

{
    "channel1": {"value": 5, "timestamp": 1472493713.271991},
    "channel2": {"value": 16, "timestamp": 1472493713.539238},
}
property hints: Hints#

A dictionary of suggestions for best-effort visualization and processing.

This does not affect what data is read or saved; it is only a suggestion to enable automated tools to provide helpful information with minimal guidance from the user. See :ref:hints.

add_children_as_readables(format: StandardReadableFormat = StandardReadableFormat.CHILD) Generator[None, None, None][source]#

Context manager that calls add_readables on child Devices added within.

Scans self.children() on entry and exit to context manager, and calls add_readables() on any that are added with the provided StandardReadableFormat.

add_readables(devices: Sequence[Device], format: StandardReadableFormat = StandardReadableFormat.CHILD) None[source]#

Add devices to contribute to various bluesky verbs.

Use output from the given devices to contribute to the verbs of the following interfaces:

Parameters:
  • devices – The devices to be added

  • format – Determines which of the devices functions are added to which verb as per the StandardReadableFormat documentation

class ophyd_async.core.StandardReadableFormat(*args, **kwds)[source]#

Bases: enum.Enum

Declare how a Device should contribute to the StandardReadable verbs.

CHILD#

‘CHILD’

Detect which verbs the child supports and contribute to:

CONFIG_SIGNAL#

‘CONFIG_SIGNAL’

Contribute the Signal value to read_configuration() and describe_configuration()

HINTED_SIGNAL#

‘HINTED_SIGNAL’

Contribute the monitored Signal value to read() and describe() and put the signal name in hints

UNCACHED_SIGNAL#

‘UNCACHED_SIGNAL’

Contribute the uncached Signal value to read() and describe()

HINTED_UNCACHED_SIGNAL#

‘HINTED_UNCACHED_SIGNAL’

Contribute the uncached Signal value to read() and describe() and put the signal name in hints

class ophyd_async.core.StandardDetector(controller: DetectorControllerT, writer: DetectorWriterT, config_sigs: Sequence[SignalR] = (), name: str = '', connector: DeviceConnector | None = None)[source]#

Bases: ophyd_async.core._device.Device, bluesky.protocols.Stageable, ophyd_async.core._protocol.AsyncConfigurable, ophyd_async.core._protocol.AsyncReadable, bluesky.protocols.Triggerable, bluesky.protocols.Preparable, bluesky.protocols.Flyable, bluesky.protocols.Collectable, bluesky.protocols.WritesStreamAssets, typing.Generic[ophyd_async.core._detector.DetectorControllerT, ophyd_async.core._detector.DetectorWriterT]

Detector base class for step and fly scanning detectors.

Aggregates controller and writer logic together.

Parameters:
  • controller – Logic for arming and disarming the detector

  • writer – Logic for making the detector write persistent data

  • config_sigs – Signals to read when describe and read configuration are called

  • name – Device name

async stage() None[source]#

Make sure the detector is idle and ready to be used.

async unstage() None[source]#

Disarm the detector and stop file writing.

async read_configuration() dict[str, Reading][source]#

Return value, timestamp, optional per-point metadata for each field name.

Same API as AsyncReadable.read but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run.

async describe_configuration() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read_configuration().

async read() dict[str, Reading][source]#

There is no data to be placed in events, so this is empty.

async describe() dict[str, DataKey][source]#

Return per-scan metadata for each field name in read().

For example:

{
    "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []},
    "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []},
}
async trigger() None[source]#
async prepare(value: TriggerInfo) None[source]#

Arm detector.

Prepare the detector with trigger information. This is determined at and passed in from the plan level.

Parameters:

value – TriggerInfo describing how to trigger the detector

async kickoff()[source]#
async complete()[source]#
async describe_collect() dict[str, DataKey][source]#
async collect_asset_docs(index: int | None = None) AsyncIterator[StreamAsset][source]#
async get_index() int[source]#
property hints: Hints#
class ophyd_async.core.TriggerInfo(/, **data: ~typing.Any)[source]#

Bases: pydantic.BaseModel

Minimal set of information required to setup triggering on a detector.

number_of_triggers: NonNegativeInt | list[NonNegativeInt]#

None

Number of triggers that will be sent, (0 means infinite).

Can be:

  • A single integer or

  • A list of integers for multiple triggers

Example for tomography: TriggerInfo(number=[2,3,100,3]). This would trigger:

  • 2 times for dark field images

  • 3 times for initial flat field images

  • 100 times for projections

  • 3 times for final flat field images

trigger: DetectorTrigger#

‘Field(…)’

Sort of triggers that will be sent

deadtime: float#

‘Field(…)’

What is the minimum deadtime between triggers

livetime: float | None#

‘Field(…)’

What is the maximum high time of the triggers

frame_timeout: float | None#

‘Field(…)’

What is the maximum timeout on waiting for a frame

multiplier: int#

1

How many triggers make up a single StreamDatum index, to allow multiple frames from a faster detector to be zipped with a single frame from a slow detector e.g. if num=10 and multiplier=5 then the detector will take 10 frames, but publish 2 indices, and describe() will show a shape of (5, h, w)

total_number_of_triggers() int#
class ophyd_async.core.DetectorTrigger(*args, **kwds)[source]#

Bases: enum.Enum

Type of mechanism for triggering a detector to take frames.

INTERNAL#

‘INTERNAL’

Detector generates internal trigger for given rate

EDGE_TRIGGER#

‘EDGE_TRIGGER’

Expect a series of arbitrary length trigger signals

CONSTANT_GATE#

‘CONSTANT_GATE’

Expect a series of constant width external gate signals

VARIABLE_GATE#

‘VARIABLE_GATE’

Expect a series of variable width external gate signals

class ophyd_async.core.DetectorController[source]#

Bases: abc.ABC

Detector logic for arming and disarming the detector.

abstractmethod get_deadtime(exposure: float | None) float[source]#

For a given exposure, how long should the time between exposures be.

abstractmethod async prepare(trigger_info: TriggerInfo) None[source]#

Do all necessary steps to prepare the detector for triggers.

Parameters:

trigger_info – The sort of triggers to expect.

abstractmethod async arm() None[source]#

Arm the detector.

abstractmethod async wait_for_idle()[source]#

Wait on the internal _arm_status and wait for it to get disarmed/idle.

abstractmethod async disarm()[source]#

Disarm the detector, return detector to an idle state.

class ophyd_async.core.DetectorWriter[source]#

Bases: abc.ABC

Logic for making detector write data to somewhere persistent (e.g. HDF5 file).

abstractmethod async open(multiplier: int = 1) dict[str, DataKey][source]#

Open writer and wait for it to be ready for data.

Parameters:

multiplier – Each StreamDatum index corresponds to this many written exposures

Returns:

Output for describe()

abstractmethod observe_indices_written(timeout=DEFAULT_TIMEOUT) AsyncGenerator[int, None][source]#

Yield the index of each frame (or equivalent data point) as it is written.

abstractmethod async get_indices_written() int[source]#

Get the number of indices written.

abstractmethod collect_stream_docs(indices_written: int) AsyncIterator[StreamAsset][source]#

Create Stream docs up to given number written.

abstractmethod async close() None[source]#

Close writer, blocks until I/O is complete.

property hints: Hints#

The hints to be used for the detector.

class ophyd_async.core.PathInfo[source]#

Information about where and how to write a file.

Parameters:
  • directory_path – Directory into which files should be written

  • filename – Base filename to use generated by FilenameProvider, w/o extension

  • create_dir_depth – Optional depth of directories to create if they do not exist

directory_path: Path#

None

filename: str#

None

create_dir_depth: int#

0

class ophyd_async.core.PathProvider[source]#

Bases: typing.Protocol

Abstract class that tells a detector where to write its data.

class ophyd_async.core.StaticPathProvider(filename_provider: FilenameProvider, directory_path: Path | str, create_dir_depth: int = 0)[source]#

Bases: ophyd_async.core._providers.PathProvider

All files will be within a static directory.

class ophyd_async.core.AutoIncrementingPathProvider(filename_provider: FilenameProvider, base_directory_path: Path, create_dir_depth: int = 0, max_digits: int = 5, starting_value: int = 0, num_calls_per_inc: int = 1, increment: int = 1, inc_delimeter: str = '_', base_name: str | None = None)[source]#

Bases: ophyd_async.core._providers.PathProvider

Provides a new numerically incremented path on each call.

class ophyd_async.core.YMDPathProvider(filename_provider: FilenameProvider, base_directory_path: Path, create_dir_depth: int = -3, device_name_as_base_dir: bool = False)[source]#

Bases: ophyd_async.core._providers.PathProvider

Provides a path with the date included in the directory name.

class ophyd_async.core.FilenameProvider[source]#

Bases: typing.Protocol

Base class for callable classes providing filenames.

class ophyd_async.core.StaticFilenameProvider(filename: str)[source]#

Bases: ophyd_async.core._providers.FilenameProvider

Provides a constant filename on every call.

class ophyd_async.core.AutoIncrementFilenameProvider(base_filename: str = '', max_digits: int = 5, starting_value: int = 0, increment: int = 1, inc_delimeter: str = '_')[source]#

Bases: ophyd_async.core._providers.FilenameProvider

Provides a new numerically incremented filename on each call.

class ophyd_async.core.UUIDFilenameProvider(uuid_call_func: Callable = uuid.uuid4, uuid_call_args: list | None = None)[source]#

Bases: ophyd_async.core._providers.FilenameProvider

Files will have a UUID as a filename.

class ophyd_async.core.NameProvider[source]#

Bases: typing.Protocol

Base class for callable classes providing data keys.

class ophyd_async.core.DatasetDescriber[source]#

Bases: typing.Protocol

For describing datasets in file writing.

abstractmethod async np_datatype() str[source]#

Return the numpy datatype for this dataset.

abstractmethod async shape() tuple[int, ...][source]#

Get the shape of the data collection.

class ophyd_async.core.HDFDataset[source]#

TODO.

data_key: str#

None

dataset: str#

None

shape: Sequence[int]#

‘field(…)’

dtype_numpy: str = <Multiline-String>#
multiplier: int#

1

swmr: bool#

False

chunk_shape: tuple[int, ...]#

()

class ophyd_async.core.HDFFile(full_file_name: Path, datasets: list[HDFDataset], hostname: str = 'localhost')[source]#

TODO.

Parameters:
  • full_file_name – Absolute path to the file to be written

  • datasets – Datasets to write into the file

stream_resources() Iterator[StreamResource][source]#
stream_data(indices_written: int) Iterator[StreamDatum][source]#
class ophyd_async.core.StandardFlyer(trigger_logic: FlyerController[T], name: str = '')[source]#

Bases: ophyd_async.core._device.Device, bluesky.protocols.Stageable, bluesky.protocols.Preparable, bluesky.protocols.Flyable, typing.Generic[ophyd_async.core._utils.T]

Base class for ‘flyable’ devices.

bluesky.protocols.Flyable.

property trigger_logic: FlyerController[T]#
async stage() None[source]#
async unstage() None[source]#
prepare(value: T) AsyncStatus[source]#

Prepare a device for scanning.

This method provides similar functionality to Stageable.stage and Movable.set, with key differences:

Stageable.stage ^^^^^^^^^^^^^^^^^^^ Staging a device translates to, “I’m going to use this in a scan, but I’m not sure how”. Preparing it translates to, “I’m about to do a step or a fly scan with these parameters”. Staging should be universal across many different types of scans, however prepare is specific to an input value passed in.

Movable.set ^^^^^^^^^^^^^^^ For some devices, preparation for a scan could involve multiple soft or hardware signals being configured and/or set. prepare therefore allows these to be bundled together, along with other logic.

For example, a Flyable device should have the following methods called on it to perform a fly-scan:

prepare(flyscan_params)
kickoff()
complete()

If the device is a detector, collect_asset_docs can be called repeatedly while complete is not done to publish frames. Alternatively, to step-scan a detector,

prepare(frame_params) to setup N software triggered frames
trigger() to take N frames
collect_asset_docs() to publish N frames

Returns a Status that is marked done when the device is ready for a scan.

async kickoff() None[source]#
async complete() None[source]#
class ophyd_async.core.FlyerController[source]#

Bases: abc.ABC, typing.Generic[ophyd_async.core._utils.T]

Base class for controlling ‘flyable’ devices.

bluesky.protocols.Flyable.

abstractmethod async prepare(value: T) Any[source]#

Move to the start of the flyscan.

abstractmethod async kickoff()[source]#

Start the flyscan.

abstractmethod async complete()[source]#

Block until the flyscan is done.

abstractmethod async stop()[source]#

Stop flying and wait everything to be stopped.

class ophyd_async.core.Settings(device: DeviceT, settings: MutableMapping[SignalRW, Any] | None = None)[source]#

Bases: collections.abc.MutableMapping[ophyd_async.core._signal.SignalRW[typing.Any], typing.Any], typing.Generic[ophyd_async.core._device.DeviceT]

Used for supplying settings to signals.

Parameters:
  • device – The device that the settings are for.

  • settings – A dictionary of settings to start with.

Example:

# Settings are created from a dict of signals to values
settings1 = Settings(device, {device.sig1: 1, device.sig2: 2})
settings2 = Settings(device, {device.sig1: 10, device.sig3: 3})
# They act like a dictionaries
assert settings1[device.sig1] == 1
# Including the ability to "or" two settings together
settings = settings1 | settings2
assert dict(settings) == {
    device.sig1: 10,
    device.sig2: 2,
    device.sig3: 3,
}
partition(predicate: Callable[[SignalRW], bool]) tuple[Settings[DeviceT], Settings[DeviceT]][source]#

Partition into two Settings based on a predicate.

Parameters:

predicate – Callable that takes each signal, and returns a boolean to say if it should be in the first returned Settings

Returns:

(where_true, where_false) where each is a Settings object. The first contains the signals for which the predicate returned True, and the second contains the signals for which the predicate returned False.

Example:

settings = Settings(device, {device.special: 1, device.sig: 2})
specials, others = settings.partition(lambda sig: "special" in sig.name)
class ophyd_async.core.SettingsProvider[source]#

Base class for providing settings.

abstractmethod async store(name: str, data: dict[str, Any])[source]#

Store the data, associating it with the given name.

abstractmethod async retrieve(name: str) dict[str, Any][source]#

Retrieve the data associated with the given name.

class ophyd_async.core.YamlSettingsProvider(directory: Path | str)[source]#

Bases: ophyd_async.core._settings.SettingsProvider

For providing settings from yaml to signals.

async store(name: str, data: dict[str, Any])[source]#

Store the data, associating it with the given name.

async retrieve(name: str) dict[str, Any][source]#

Retrieve the data associated with the given name.

ophyd_async.core.config_ophyd_async_logging(file=sys.stdout, fmt=DEFAULT_FORMAT, datefmt=DEFAULT_DATE_FORMAT, color=True, level='WARNING')[source]#

Set a new handler on the logging.getLogger('ophyd_async') logger.

If this is called more than once, the handler from the previous invocation is removed (if still present) and replaced.

Parameters:
  • file – object with write method or filename string. Default is sys.stdout.

  • fmt – str Overall logging format

  • datefmt – str Date format. Default is '%H:%M:%S'.

  • color – bool Use ANSI color codes. True by default.

  • level – str or int Python logging level, given as string or corresponding integer. Default is ‘WARNING’.

Returns:

The handler, which has already been added to the ‘ophyd_async’ logger.

Examples:

Log to a file.

config_ophyd_async_logging(file='/tmp/what_is_happening.txt')

Include the date along with the time. (The log messages will always include microseconds, which are configured separately, not as part of ‘datefmt’.)

config_ophyd_async_logging(datefmt="%Y-%m-%d %H:%M:%S")

Turn off ANSI color codes.

config_ophyd_async_logging(color=False)

Increase verbosity: show level DEBUG or higher.

config_ophyd_async_logging(level='DEBUG')
ophyd_async.core.CALCULATE_TIMEOUT#

‘CALCULATE_TIMEOUT’

Sentinel used to implement myfunc(timeout=CalculateTimeout)

This signifies that the function should calculate a suitable non-zero timeout itself

ophyd_async.core.CalculatableTimeout#

None

ophyd_async.core.DEFAULT_TIMEOUT#

10.0

ophyd_async.core.Callback#

None

exception ophyd_async.core.NotConnected(errors: str | Mapping[str, Exception])[source]#

Bases: Exception

Exception to be raised if a Device.connect is cancelled.

Parameters:

errors – Mapping of device name to Exception or another NotConnected. Alternatively a string with the signal error text.

property sub_errors: Mapping[str, Exception]#
format_error_string(indent='') str[source]#
classmethod with_other_exceptions_logged(exceptions: Mapping[str, Exception]) NotConnected[source]#
class ophyd_async.core.Reference(obj: T)[source]#

Bases: typing.Generic[ophyd_async.core._utils.T]

Hide an object behind a reference.

Used to opt out of the naming/parent-child relationship of Device.

Example:

class DeviceWithRefToSignal(Device):
    def __init__(self, signal: SignalRW[int]):
        self.signal_ref = Reference(signal)
        super().__init__()

    def set(self, value) -> AsyncStatus:
        return self.signal_ref().set(value + 1)
async ophyd_async.core.gather_dict(coros: dict[T, Awaitable[V]]) dict[T, V][source]#

Take named coros and return a dict of their name to their return value.

ophyd_async.core.get_dtype(datatype: type) dtype[source]#

Get the runtime dtype from a numpy ndarray type annotation.

>>> from ophyd_async.core import Array1D
>>> import numpy as np
>>> get_dtype(Array1D[np.int8])
dtype('int8')

ophyd_async.core.get_enum_cls(datatype: type | None) type[StrictEnum] | None[source]#

Get the enum class from a datatype.

Raises:

TypeError – if type is not a StrictEnum or SubsetEnum subclass

>>> from ophyd_async.core import StrictEnum
>>> from collections.abc import Sequence
>>> class MyEnum(StrictEnum):
...     A = "A value"
>>> get_enum_cls(str)
>>> get_enum_cls(MyEnum)
<enum 'MyEnum'>
>>> get_enum_cls(Sequence[MyEnum])
<enum 'MyEnum'>

ophyd_async.core.get_unique(values: dict[str, T], types: str) T[source]#

If all values are the same, return that value, otherwise raise TypeError.

>>> get_unique({"a": 1, "b": 1}, "integers")
1
>>> get_unique({"a": 1, "b": 2}, "integers")
Traceback (most recent call last):
 ...
TypeError: Differing integers: a has 1, b has 2

ophyd_async.core.in_micros(t: float) int[source]#

Convert between a seconds and microseconds.

Parameters:

t – A time in seconds

Returns:

A time in microseconds, rounded up to the nearest whole microsecond

Raises:

ValueError – if t < 0

ophyd_async.core.make_datakey(datatype: type[SignalDatatypeT], value: SignalDatatypeT, source: str, metadata: SignalMetadata) DataKey[source]#

Make a DataKey for a given datatype.

async ophyd_async.core.wait_for_connection(**coros: Awaitable[None])[source]#

Call many underlying signals, accumulating exceptions and returning them.

Expected kwargs should be a mapping of names to coroutine tasks to execute.

ophyd_async.core.ConfigSignal#

‘_compat_format(…)’

ophyd_async.core.HintedSignal: Any#

‘_compat_format(…)’