ophyd_async.core
#
The building blocks for making devices.
Package Contents#
Classes#
Common base class for all Ophyd Async Devices. |
|
Defines how a |
|
For filling signals on introspected devices. |
|
Defines a dictionary of Device children with arbitrary integer keys. |
|
Async implementations of the sync |
|
Async implementation of the sync |
|
Async implementation of the sync |
|
Protocol for watching changes in values. |
|
Convert an asyncio awaitable to bluesky Status interface. |
|
Convert an asyncio async iterable to bluesky Status and Watcher interface. |
|
A dataclass such that, when expanded, it provides the kwargs for a watcher. |
|
A Device with the concept of a value, with R, RW, W and X flavours. |
|
Signal that can be read from and monitored. |
|
Signal that can be set. |
|
Signal that can be both read and set. |
|
Signal that puts the default value. |
|
A read/write/monitor backend for a Signals. |
|
Used for connecting signals with a given backend. |
|
All members should exist in the Backend, and there will be no extras. |
|
All members should exist in the Backend, but there may be extras. |
|
An abstraction of a Table where each field is a column. |
|
Metadata for a signal. No field is required. |
|
An backend to a soft Signal, for test signals see |
|
A lazily created Mock to be used when connecting in mock mode. |
|
Signal backend for testing, created by |
|
Device that provides selected child Device values in |
|
Declare how a |
|
Detector base class for step and fly scanning detectors. |
|
Minimal set of information required to setup triggering on a detector. |
|
Type of mechanism for triggering a detector to take frames. |
|
Detector logic for arming and disarming the detector. |
|
Logic for making detector write data to somewhere persistent (e.g. HDF5 file). |
|
Information about where and how to write a file. |
|
Abstract class that tells a detector where to write its data. |
|
All files will be within a static directory. |
|
Provides a new numerically incremented path on each call. |
|
Provides a path with the date included in the directory name. |
|
Base class for callable classes providing filenames. |
|
Provides a constant filename on every call. |
|
Provides a new numerically incremented filename on each call. |
|
Files will have a UUID as a filename. |
|
Base class for callable classes providing data keys. |
|
For describing datasets in file writing. |
|
TODO. |
|
TODO. |
|
Base class for ‘flyable’ devices. |
|
Base class for controlling ‘flyable’ devices. |
|
Used for supplying settings to signals. |
|
Base class for providing settings. |
|
For providing settings from yaml to signals. |
|
Hide an object behind a reference. |
Functions#
Auto initialize top level Device instances: to be used as a context manager. |
|
Return a completed AsyncStatus. |
|
Create a read-only Signal with a |
|
Create a read-writable Signal with a |
|
Subscribe to the value of a signal so it can be iterated from. |
|
Subscribe to a set of signals so they can be iterated from. |
|
Wait for a signal to have a matching value. |
|
Set a signal and monitor that same signal until it has the specified value. |
|
Set a signal and monitor another signal until it has the specified value. |
|
Retrieve all SignalRWs from a device. |
|
Set a new handler on the |
|
Take named coros and return a dict of their name to their return value. |
|
Get the runtime dtype from a numpy ndarray type annotation. |
|
Get the enum class from a datatype. |
|
If all values are the same, return that value, otherwise raise TypeError. |
|
Convert between a seconds and microseconds. |
|
Make a DataKey for a given datatype. |
|
Call many underlying signals, accumulating exceptions and returning them. |
Data#
The supported |
|
A typevar for a |
|
A numpy dtype like |
|
A type alias for a 1D numpy array with a specific scalar data type. |
|
Sentinel used to implement |
|
API#
- class ophyd_async.core.Device(name: str = '', connector: DeviceConnector | None = None)[source]#
Bases:
bluesky.protocols.HasName
Common base class for all Ophyd Async Devices.
- Parameters:
name – Optional name of the Device
connector – Optional DeviceConnector instance to use at connect()
- children() Iterator[tuple[str, Device]] [source]#
For each attribute that is a Device, yield the name and Device.
- Yields:
(attr_name, attr)
for each child attribute that is a Device.
- log() LoggerAdapter #
Return a logger configured with the device name.
- set_name(name: str, *, child_name_separator: str | None = None) None [source]#
Set
self.name=name
and eachself.child.name=name+"-child"
.- Parameters:
name – New name to set.
child_name_separator – Use this as a separator instead of “-”. Use “_” instead to make the same names as the equivalent ophyd sync device.
- async connect(mock: bool | LazyMock = False, timeout: float = DEFAULT_TIMEOUT, force_reconnect: bool = False) None [source]#
Connect the device and all child devices.
Successful connects will be cached so subsequent calls will return immediately. Contains a timeout that gets propagated to child.connect methods.
- Parameters:
mock – If True then use
MockSignalBackend
for all Signals. If passed aLazyMock
then pass this down for use within the Signals, otherwise create one.timeout – Time to wait before failing with a TimeoutError.
force_reconnect – If True, force a reconnect even if the last connect succeeded.
- class ophyd_async.core.DeviceConnector[source]#
Defines how a
Device
should be connected and type hints processed.- create_children_from_annotations(device: Device)[source]#
Use when children can be created from introspecting the hardware.
Some control systems allow introspection of a device to determine what children it has. To allow this to work nicely with typing we add these hints to the Device like so::
my_signal: SignalRW[int] my_device: MyDevice
This method will be run during
Device.__init__
, and is responsible for turning all of those type hints into real Signal and Device instances.Subsequent runs of this function should do nothing, to allow it to be called early in Devices that need to pass references to their children during
__init__
.
- async connect_mock(device: Device, mock: LazyMock)[source]#
Use during
Device.connect
withmock=True
.This is called when there is no cached connect done in
mock=True
mode. It connects the Device and all its children in mock mode.
- class ophyd_async.core.DeviceFiller(device: Device, signal_backend_factory: Callable[[type[SignalDatatype] | None], SignalBackendT], device_connector_factory: Callable[[], DeviceConnectorT])[source]#
Bases:
typing.Generic
[ophyd_async.core._device_filler.SignalBackendT
,ophyd_async.core._device_filler.DeviceConnectorT
]For filling signals on introspected devices.
- Parameters:
device – The device to fill.
signal_backend_factory – A callable that returns a SignalBackend.
device_connector_factory – A callable that returns a DeviceConnector.
- create_signals_from_annotations(filled=True) Iterator[tuple[SignalBackendT, list[Any]]] [source]#
Create all Signals from annotations.
- Parameters:
filled – If True then the Signals created should be considered already filled with connection data. If False then
fill_child_signal
needs calling at device connection time before the signal can be connected.- Yields:
(backend, extras)
TheSignalBackend
that has been created for this Signal, and the list of extra annotations that could be used to customize it. For example anEpicsDeviceConnector
consumesPvSuffix
extras to set the write_pv of the backend. Any unhandled extras should be left on the list so this class can handle them, e.g.StandardReadableFormat
instances.
- create_devices_from_annotations(filled=True) Iterator[tuple[DeviceConnectorT, list[Any]]] [source]#
Create all Signals from annotations.
- Parameters:
filled – If True then the Devices created should be considered already filled with connection data. If False then
fill_child_device
needs calling at parent device connection time before the child Device can be connected.- Yields:
(connector, extras)
TheDeviceConnector
that has been created for this Signal, and the list of extra annotations that could be used to customize it.
- create_device_vector_entries_to_mock(num: int)[source]#
Create num entries for each
DeviceVector
.This is used when the Device is being connected in mock mode.
- check_filled(source: str)[source]#
Check that all the created Signals and Devices are filled.
- Parameters:
source – The source of the data that should have done the filling, for reporting as an error message
- fill_child_signal(name: str, signal_type: type[Signal], vector_index: int | None = None) SignalBackendT [source]#
Mark a Signal as filled, and return its backend for filling.
- Parameters:
name – The name without trailing underscore, the name in the control system
signal_type – One of the types
SignalR
,SignalW
,SignalRW
orSignalX
vector_index – If the child is in a
DeviceVector
then what index is it
- Returns:
The SignalBackend for the filled Signal.
- fill_child_device(name: str, device_type: type[Device] = Device, vector_index: int | None = None) DeviceConnectorT [source]#
Mark a Device as filled, and return its connector for filling.
- Parameters:
name – The name without trailing underscore, the name in the control system
device_type – The
Device
subclass to be createdvector_index – If the child is in a
DeviceVector
then what index is it
- Returns:
The DeviceConnector for the filled Device.
- class ophyd_async.core.DeviceVector(children: Mapping[int, DeviceT], name: str = '')[source]#
Bases:
collections.abc.MutableMapping
[int
,ophyd_async.core._device.DeviceT
],ophyd_async.core._device.Device
Defines a dictionary of Device children with arbitrary integer keys.
- See-also:
Implementing Devices for examples of how to use this class.
- ophyd_async.core.init_devices(set_name=True, child_name_separator: str = '-', connect=True, mock=False, timeout: float = 10.0)[source]#
Auto initialize top level Device instances: to be used as a context manager.
- Parameters:
set_name – If True, call
device.set_name(variable_name)
on all Devices created within the context manager that have an emptyname
.child_name_separator – Separator for child names if
set_name
is True.connect – If True, call
device.connect(mock, timeout)
in parallel on all Devices created within the context manager.mock – If True, connect Signals in mock mode.
timeout – How long to wait for connect before logging an exception.
- Raises:
RuntimeError – If used inside a plan, use
ensure_connected
instead.NotConnected – If devices could not be connected.
For example, to connect and name 2 motors in parallel:
[async] with init_devices(): t1x = motor.Motor("BLxxI-MO-TABLE-01:X") t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y") # Names and connects devices here assert t1x.name == "t1x"
- class ophyd_async.core.AsyncReadable[source]#
Bases:
bluesky.protocols.HasName
,typing.Protocol
Async implementations of the sync
bluesky.protocols.Readable
.
- class ophyd_async.core.AsyncConfigurable[source]#
Bases:
bluesky.protocols.HasName
,typing.Protocol
Async implementation of the sync
bluesky.protocols.Configurable
.
- class ophyd_async.core.AsyncStageable[source]#
Bases:
typing.Protocol
Async implementation of the sync
bluesky.protocols.Stageable
.- abstractmethod stage() AsyncStatus [source]#
Set up the device for acquisition.
- Returns:
An
AsyncStatus
that is marked done when the device is done staging.
- abstractmethod unstage() AsyncStatus [source]#
Clean up the device after acquisition.
- Returns:
An
AsyncStatus
that is marked done when the device is done unstaging.
- class ophyd_async.core.Watcher[source]#
Bases:
typing.Protocol
,typing.Generic
[ophyd_async.core._protocol.C
]Protocol for watching changes in values.
- class ophyd_async.core.AsyncStatus(awaitable: Coroutine | Task, name: str | None = None)[source]#
Bases:
ophyd_async.core._status.AsyncStatusBase
Convert an asyncio awaitable to bluesky Status interface.
- Parameters:
awaitable – The coroutine or task to await.
name – The name of the device, if available.
For example:
status = AsyncStatus(asyncio.sleep(1)) assert not status.done await status # waits for 1 second assert status.done
- class ophyd_async.core.WatchableAsyncStatus(iterator: AsyncIterator[WatcherUpdate[T]], name: str | None = None)[source]#
Bases:
ophyd_async.core._status.AsyncStatusBase
,typing.Generic
[ophyd_async.core._utils.T
]Convert an asyncio async iterable to bluesky Status and Watcher interface.
- Parameters:
iterator – The async iterable to await.
name – The name of the device, if available.
- watch(watcher: Watcher)[source]#
Add a watcher to the status.
It is called:
immediately if there has already been an update
on every subsequent update
- classmethod wrap(f: Callable[P, AsyncIterator[WatcherUpdate[T]]]) Callable[P, WatchableAsyncStatus[T]] [source]#
Wrap an AsyncIterator in a WatchableAsyncStatus.
For example:
class MyDevice(Device): @WatchableAsyncStatus.wrap async def trigger(self): # sleep for a second, updating on progress every 0.1 seconds for i in range(10): yield WatcherUpdate(initial=0, current=i*0.1, target=1) await asyncio.sleep(0.1)
- class ophyd_async.core.WatcherUpdate[source]#
Bases:
typing.Generic
[ophyd_async.core._utils.T
]A dataclass such that, when expanded, it provides the kwargs for a watcher.
- current: T#
None
The current value, where it currently is.
- initial: T#
None
The initial value, where it was when it started.
- target: T#
None
The target value, where it will be when it finishes.
- async ophyd_async.core.completed_status(exception: Exception | None = None)[source]#
Return a completed AsyncStatus.
- Parameters:
exception – If given, then raise this exception when awaited.
- class ophyd_async.core.Signal(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#
Bases:
ophyd_async.core._device.Device
,typing.Generic
[ophyd_async.core._signal_backend.SignalDatatypeT
]A Device with the concept of a value, with R, RW, W and X flavours.
- Parameters:
backend – The backend for providing Signal values.
timeout – The default timeout for operations on the Signal.
name – The name of the signal.
- class ophyd_async.core.SignalR(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#
Bases:
ophyd_async.core._signal.Signal
[ophyd_async.core._signal_backend.SignalDatatypeT
],ophyd_async.core._protocol.AsyncReadable
,ophyd_async.core._protocol.AsyncStageable
,bluesky.protocols.Subscribable
Signal that can be read from and monitored.
- async read(cached: bool | None = None) dict[str, Reading] [source]#
Return a single item dict with the reading in it.
- Parameters:
cached –
Whether to use the cached monitored value:
If None, use the cache if it exists.
If False, do an explicit get.
If True, explicitly use the cache and raise an error if it doesn’t exist.
- async describe() dict[str, DataKey] [source]#
Return a single item dict describing the signal value.
- async get_value(cached: bool | None = None) SignalDatatypeT [source]#
Return the current value.
- Parameters:
cached –
Whether to use the cached monitored value:
If None, use the cache if it exists.
If False, do an explicit get.
If True, explicitly use the cache and raise an error if it doesn’t exist.
- subscribe_value(function: Callback[SignalDatatypeT])[source]#
Subscribe to updates in value of a device.
- Parameters:
function – The callback function to call when the value changes.
- subscribe(function: Callback[dict[str, Reading]]) None [source]#
Subscribe to updates in the reading.
- Parameters:
function – The callback function to call when the reading changes.
- class ophyd_async.core.SignalW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#
Bases:
ophyd_async.core._signal.Signal
[ophyd_async.core._signal_backend.SignalDatatypeT
],bluesky.protocols.Movable
Signal that can be set.
- async set(value: SignalDatatypeT, wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT) None [source]#
Set the value and return a status saying when it’s done.
- Parameters:
value – The value to set.
wait – If True, wait for the set to complete.
timeout – The timeout for the set.
- class ophyd_async.core.SignalRW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#
Bases:
ophyd_async.core._signal.SignalR
[ophyd_async.core._signal_backend.SignalDatatypeT
],ophyd_async.core._signal.SignalW
[ophyd_async.core._signal_backend.SignalDatatypeT
],bluesky.protocols.Locatable
Signal that can be both read and set.
- class ophyd_async.core.SignalX(backend: SignalBackend[SignalDatatypeT], timeout: float | None = DEFAULT_TIMEOUT, name: str = '')[source]#
Bases:
ophyd_async.core._signal.Signal
Signal that puts the default value.
- async trigger(wait=True, timeout: CalculatableTimeout = CALCULATE_TIMEOUT) None [source]#
Trigger the action and return a status saying when it’s done.
- Parameters:
wait – If True, wait for the trigger to complete.
timeout – The timeout for the trigger.
- class ophyd_async.core.SignalBackend(datatype: type[SignalDatatypeT] | None)[source]#
Bases:
typing.Generic
[ophyd_async.core._signal_backend.SignalDatatypeT
]A read/write/monitor backend for a Signals.
- abstractmethod source(name: str, read: bool) str [source]#
Return source of signal.
- Parameters:
name – The name of the signal, which can be used or discarded.
read – True if we want the source for reading, False if writing.
- abstractmethod async put(value: SignalDatatypeT | None, wait: bool)[source]#
Put a value to the PV, if wait then wait for completion.
- abstractmethod async get_datakey(source: str) DataKey [source]#
Metadata like source, dtype, shape, precision, units.
- abstractmethod async get_reading() Reading[SignalDatatypeT] [source]#
Return the current value, timestamp and severity.
- abstractmethod async get_value() SignalDatatypeT [source]#
Return the current value.
- abstractmethod async get_setpoint() SignalDatatypeT [source]#
Return the point that a signal was requested to move to.
- class ophyd_async.core.SignalConnector(backend: SignalBackend)[source]#
Bases:
ophyd_async.core._device.DeviceConnector
Used for connecting signals with a given backend.
- async connect_mock(device: Device, mock: LazyMock)[source]#
Use during
Device.connect
withmock=True
.This is called when there is no cached connect done in
mock=True
mode. It connects the Device and all its children in mock mode.
- ophyd_async.core.SignalDatatype#
None
The supported
Signal
datatypes:A
StrictEnum
orSubsetEnum
subclassA fixed datatype
Array1D
of numpy bool, signed and unsigned integers or floatA
numpy.ndarray
which can change dimensions and datatype at runtimeA sequence of
str
A sequence of
StrictEnum
orSubsetEnum
subclassA
Table
subclass
- ophyd_async.core.SignalDatatypeT#
‘TypeVar(…)’
A typevar for a
SignalDatatype
.
- ophyd_async.core.DTypeScalar_co#
‘TypeVar(…)’
A numpy dtype like
numpy.float64
.
- ophyd_async.core.Array1D#
None
A type alias for a 1D numpy array with a specific scalar data type.
E.g.
Array1D[np.float64]
is a 1D numpy array of 64-bit floats.
- class ophyd_async.core.StrictEnum[source]#
-
All members should exist in the Backend, and there will be no extras.
- class ophyd_async.core.SubsetEnum[source]#
Bases:
ophyd_async.core._utils.StrictEnum
All members should exist in the Backend, but there may be extras.
- class ophyd_async.core.Table(**kwargs)[source]#
Bases:
pydantic.BaseModel
An abstraction of a Table where each field is a column.
For example:
>>> from ophyd_async.core import Table, Array1D >>> import numpy as np >>> from collections.abc import Sequence >>> class MyTable(Table): ... a: Array1D[np.int8] ... b: Sequence[str] ... >>> t = MyTable(a=[1, 2], b=["x", "y"]) >>> len(t) # the length is the number of rows 2 >>> t2 = t + t # adding tables together concatenates them >>> t2.a array([1, 2, 1, 2], dtype=int8) >>> t2.b ['x', 'y', 'x', 'y'] >>> t2[1] # slice a row array([(2, b'y')], dtype=[('a', 'i1'), ('b', 'S40')])
- model_config#
‘ConfigDict(…)’
- class ophyd_async.core.SignalMetadata[source]#
Bases:
typing.TypedDict
Metadata for a signal. No field is required.
- class ophyd_async.core.SoftSignalBackend(datatype: type[SignalDatatypeT] | None, initial_value: SignalDatatypeT | None = None, units: str | None = None, precision: int | None = None)[source]#
Bases:
ophyd_async.core._signal_backend.SignalBackend
[ophyd_async.core._signal_backend.SignalDatatypeT
]An backend to a soft Signal, for test signals see
MockSignalBackend
.- Parameters:
datatype – The datatype of the signal, defaults to float if not given.
initial_value – The initial value of the signal, defaults to the “empty”, “zero” or “default” value of the datatype if not given.
units – The units for numeric datatypes.
precision – The number of digits after the decimal place to display for a float datatype.
- set_value(value: SignalDatatypeT)[source]#
Set the current value, alarm and timestamp.
- source(name: str, read: bool) str [source]#
Return source of signal.
- Parameters:
name – The name of the signal, which can be used or discarded.
read – True if we want the source for reading, False if writing.
- async put(value: SignalDatatypeT | None, wait: bool) None [source]#
Put a value to the PV, if wait then wait for completion.
- async get_datakey(source: str) DataKey [source]#
Metadata like source, dtype, shape, precision, units.
- async get_reading() Reading[SignalDatatypeT] [source]#
Return the current value, timestamp and severity.
- async get_value() SignalDatatypeT [source]#
Return the current value.
- async get_setpoint() SignalDatatypeT [source]#
Return the point that a signal was requested to move to.
- ophyd_async.core.soft_signal_r_and_setter(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]] [source]#
Create a read-only Signal with a
SoftSignalBackend
.May pass metadata, which are propagated into describe. Use soft_signal_rw if you want a device that is externally modifiable.
- Parameters:
datatype – The datatype of the signal.
initial_value – The initial value of the signal.
name – The name of the signal.
units – The units of the signal.
precision – The precision of the signal.
- Returns:
A tuple of the created SignalR and a callable to set its value.
- ophyd_async.core.soft_signal_rw(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) SignalRW[SignalDatatypeT] [source]#
Create a read-writable Signal with a
SoftSignalBackend
.May pass metadata, which are propagated into describe.
- Parameters:
datatype – The datatype of the signal.
initial_value – The initial value of the signal.
name – The name of the signal.
units – The units of the signal.
precision – The precision of the signal.
- class ophyd_async.core.LazyMock(name: str = '', parent: LazyMock | None = None)[source]#
A lazily created Mock to be used when connecting in mock mode.
Creating Mocks is reasonably expensive when each Device (and Signal) requires its own, and the tree is only used when
Signal.set()
is called. This class allows a tree of lazily connected Mocks to be constructed so that when the leaf is created, so are its parents. Any calls to the child are then accessible from the parent mock.>>> parent = LazyMock() >>> child = parent.child("child") >>> child_mock = child() >>> child_mock() <Mock name='mock.child()' id='...'> >>> parent_mock = parent() >>> parent_mock.mock_calls [call.child()]
- class ophyd_async.core.MockSignalBackend(initial_backend: SignalBackend[SignalDatatypeT], mock: LazyMock)[source]#
Bases:
ophyd_async.core._signal_backend.SignalBackend
[ophyd_async.core._signal_backend.SignalDatatypeT
]Signal backend for testing, created by
Device.connect(mock=True)
.- set_value(value: SignalDatatypeT)[source]#
Set the value of the signal.
- source(name: str, read: bool) str [source]#
Return source of signal.
- Parameters:
name – The name of the signal, which can be used or discarded.
read – True if we want the source for reading, False if writing.
- put_proceeds() Event #
Return an Event that will block
put()
until set.The Event is initially set, but can be unset to block
put()
.
- async put(value: SignalDatatypeT | None, wait: bool)[source]#
Put a value to the PV, if wait then wait for completion.
- async get_value() SignalDatatypeT [source]#
Return the current value.
- async get_setpoint() SignalDatatypeT [source]#
Return the point that a signal was requested to move to.
- async ophyd_async.core.observe_value(signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[SignalDatatypeT, None] [source]#
Subscribe to the value of a signal so it can be iterated from.
The first value yielded in the iterator will be the current value of the Signal, and subsequent updates from the control system will result in that value being yielded, even if it is the same as the previous value.
- Parameters:
signal – Call subscribe_value on this at the start, and clear_sub on it at the end.
timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError.
done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.
done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an ‘asyncio.wait_for’ timeout.
Due to a rare condition with busy signals, it is not recommended to use this function with asyncio.timeout, including in an
asyncio.wait_for
loop. Instead, this timeout should be given to the done_timeout parameter.- Example:
async for value in observe_value(sig): do_something_with(value)
- async ophyd_async.core.observe_signals_value(*signals: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None, done_timeout: float | None = None) AsyncGenerator[tuple[SignalR[SignalDatatypeT], SignalDatatypeT], None] [source]#
Subscribe to a set of signals so they can be iterated from.
The first values yielded in the iterator will be the current values of the Signals, and subsequent updates from the control system will result in that value being yielded, even if it is the same as the previous value.
- Parameters:
signals – Call subscribe_value on all the signals at the start, and clear_sub on it at the end.
timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError.
done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.
done_timeout – If given, the maximum time to watch a signal, in seconds. If the loop is still being watched after this length, raise asyncio.TimeoutError. This should be used instead of on an
asyncio.wait_for
timeout.
- Example:
async for signal, value in observe_signals_values(sig1, sig2, ..): if signal is sig1: do_something_with(value) elif signal is sig2: do_something_else_with(value)
- async ophyd_async.core.wait_for_value(signal: SignalR[SignalDatatypeT], match: SignalDatatypeT | Callable[[SignalDatatypeT], bool], timeout: float | None) None [source]#
Wait for a signal to have a matching value.
- Parameters:
signal – Call subscribe_value on this at the start, and clear_sub on it at the end.
match – If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match.
timeout – How long to wait for the value to match.
- Example:
await wait_for_value(device.acquiring, 1, timeout=1) # or await wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
- async ophyd_async.core.set_and_wait_for_value(signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, match_value: SignalDatatypeT | Callable[[SignalDatatypeT], bool] | None = None, timeout: float = DEFAULT_TIMEOUT, set_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus [source]#
Set a signal and monitor that same signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for a match_signal to have the match_value.
- Parameters:
signal – The signal to set.
value – The value to set it to.
match_value – The value (or callable that says if the value matches) to wait for.
timeout – How long to wait for the signal to have the value.
set_timeout – How long to wait for the set to complete.
wait_for_set_completion – If False then return as soon as the match_signal matches match_value. If True then also wait for the set operation to complete before returning.
- Seealso:
How to interact with signals while implementing bluesky verbs
- Examples:
To set a parameter and wait for it’s value to change:
await set_and_wait_for_value(device.parameter, 1)
For busy record, or other Signals with pattern:
Set Signal with
wait=True
and stash the StatusRead the same Signal to check the operation has started
Return the Status so calling code can wait for operation to complete
status = await set_and_wait_for_value( device.acquire, 1, wait_for_set_completion=False ) # device is now acquiring await status # device has finished acquiring
- async ophyd_async.core.set_and_wait_for_other_value(set_signal: SignalW[SignalDatatypeT], set_value: SignalDatatypeT, match_signal: SignalR[SignalDatatypeV], match_value: SignalDatatypeV | Callable[[SignalDatatypeV], bool], timeout: float = DEFAULT_TIMEOUT, set_timeout: float | None = None, wait_for_set_completion: bool = True) AsyncStatus [source]#
Set a signal and monitor another signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for a match_signal to have the match_value.
- Parameters:
set_signal – The signal to set.
set_value – The value to set it to.
match_signal – The signal to monitor.
match_value – The value (or callable that says if the value matches) to wait for.
timeout – How long to wait for the signal to have the value.
set_timeout – How long to wait for the set to complete.
wait_for_set_completion – If False then return as soon as the match_signal matches match_value. If True then also wait for the set operation to complete before returning.
- Seealso:
How to interact with signals while implementing bluesky verbs
- Example:
To set the setpoint and wait for the readback to match:
await set_and_wait_for_value(device.setpoint, 1, device.readback, 1)
- ophyd_async.core.walk_rw_signals(device: Device, path_prefix: str = '') dict[str, SignalRW[Any]] [source]#
Retrieve all SignalRWs from a device.
Stores retrieved signals with their dotted attribute paths in a dictionary. Used as part of saving and loading a device.
- Parameters:
device – Device to retrieve read-write signals from.
path_prefix – For internal use, leave blank when calling the method.
- Returns:
A dictionary matching the string attribute path of a SignalRW with the signal itself.
- class ophyd_async.core.StandardReadable(name: str = '', connector: DeviceConnector | None = None)[source]#
Bases:
ophyd_async.core._device.Device
,ophyd_async.core._protocol.AsyncReadable
,ophyd_async.core._protocol.AsyncConfigurable
,ophyd_async.core._protocol.AsyncStageable
,bluesky.protocols.HasHints
Device that provides selected child Device values in
read()
.Provides the ability for children to be registered to:
Participate in
stage()
andunstage()
Provide their value in
read()
and `describe()Provide their value in
read_configuration()
and `describe_configuration()Select a value to appear in
hints
The behavior is customized with a
StandardReadableFormat
- async describe_configuration() dict[str, DataKey] [source]#
Return per-scan metadata for each field name in
read_configuration()
.
- async read_configuration() dict[str, Reading] [source]#
Return value, timestamp, optional per-point metadata for each field name.
Same API as
AsyncReadable.read
but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run.
- async describe() dict[str, DataKey] [source]#
Return per-scan metadata for each field name in
read()
.For example:
{ "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []}, "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []}, }
- async read() dict[str, Reading] [source]#
Return value, timestamp, optional per-point metadata for each field name.
For example:
{ "channel1": {"value": 5, "timestamp": 1472493713.271991}, "channel2": {"value": 16, "timestamp": 1472493713.539238}, }
- property hints: Hints#
A dictionary of suggestions for best-effort visualization and processing.
This does not affect what data is read or saved; it is only a suggestion to enable automated tools to provide helpful information with minimal guidance from the user. See :ref:
hints
.
- add_children_as_readables(format: StandardReadableFormat = StandardReadableFormat.CHILD) Generator[None, None, None] [source]#
Context manager that calls
add_readables
on child Devices added within.Scans
self.children()
on entry and exit to context manager, and callsadd_readables()
on any that are added with the providedStandardReadableFormat
.
- add_readables(devices: Sequence[Device], format: StandardReadableFormat = StandardReadableFormat.CHILD) None [source]#
Add devices to contribute to various bluesky verbs.
Use output from the given devices to contribute to the verbs of the following interfaces:
- Parameters:
devices – The devices to be added
format – Determines which of the devices functions are added to which verb as per the
StandardReadableFormat
documentation
- class ophyd_async.core.StandardReadableFormat(*args, **kwds)[source]#
Bases:
enum.Enum
Declare how a
Device
should contribute to theStandardReadable
verbs.- CHILD#
‘CHILD’
Detect which verbs the child supports and contribute to:
read()
,describe()
if it isbluesky.protocols.Readable
read_configuration()
,describe_configuration()
if it isbluesky.protocols.Configurable
stage()
,unstage()
if it isbluesky.protocols.Stageable
hints
if itbluesky.protocols.HasHints
- CONFIG_SIGNAL#
‘CONFIG_SIGNAL’
Contribute the
Signal
value toread_configuration()
anddescribe_configuration()
- HINTED_SIGNAL#
‘HINTED_SIGNAL’
Contribute the monitored
Signal
value toread()
anddescribe()
and put the signal name inhints
- UNCACHED_SIGNAL#
‘UNCACHED_SIGNAL’
Contribute the uncached
Signal
value toread()
anddescribe()
- HINTED_UNCACHED_SIGNAL#
‘HINTED_UNCACHED_SIGNAL’
Contribute the uncached
Signal
value toread()
anddescribe()
and put the signal name inhints
- class ophyd_async.core.StandardDetector(controller: DetectorControllerT, writer: DetectorWriterT, config_sigs: Sequence[SignalR] = (), name: str = '', connector: DeviceConnector | None = None)[source]#
Bases:
ophyd_async.core._device.Device
,bluesky.protocols.Stageable
,ophyd_async.core._protocol.AsyncConfigurable
,ophyd_async.core._protocol.AsyncReadable
,bluesky.protocols.Triggerable
,bluesky.protocols.Preparable
,bluesky.protocols.Flyable
,bluesky.protocols.Collectable
,bluesky.protocols.WritesStreamAssets
,typing.Generic
[ophyd_async.core._detector.DetectorControllerT
,ophyd_async.core._detector.DetectorWriterT
]Detector base class for step and fly scanning detectors.
Aggregates controller and writer logic together.
- Parameters:
controller – Logic for arming and disarming the detector
writer – Logic for making the detector write persistent data
config_sigs – Signals to read when describe and read configuration are called
name – Device name
- async read_configuration() dict[str, Reading] [source]#
Return value, timestamp, optional per-point metadata for each field name.
Same API as
AsyncReadable.read
but for slow-changing fields related to configuration. e.g., exposure time. These will typically be read only once per run.
- async describe_configuration() dict[str, DataKey] [source]#
Return per-scan metadata for each field name in
read_configuration()
.
- async read() dict[str, Reading] [source]#
There is no data to be placed in events, so this is empty.
- async describe() dict[str, DataKey] [source]#
Return per-scan metadata for each field name in
read()
.For example:
{ "channel1": {"source": "SOME_PV1", "dtype": "number", "shape": []}, "channel2": {"source": "SOME_PV2", "dtype": "number", "shape": []}, }
- async prepare(value: TriggerInfo) None [source]#
Arm detector.
Prepare the detector with trigger information. This is determined at and passed in from the plan level.
- Parameters:
value – TriggerInfo describing how to trigger the detector
- async collect_asset_docs(index: int | None = None) AsyncIterator[StreamAsset] [source]#
- class ophyd_async.core.TriggerInfo(/, **data: ~typing.Any)[source]#
Bases:
pydantic.BaseModel
Minimal set of information required to setup triggering on a detector.
- number_of_triggers: NonNegativeInt | list[NonNegativeInt]#
None
Number of triggers that will be sent, (0 means infinite).
Can be:
A single integer or
A list of integers for multiple triggers
Example for tomography:
TriggerInfo(number=[2,3,100,3])
. This would trigger:2 times for dark field images
3 times for initial flat field images
100 times for projections
3 times for final flat field images
- trigger: DetectorTrigger#
‘Field(…)’
Sort of triggers that will be sent
- multiplier: int#
1
How many triggers make up a single StreamDatum index, to allow multiple frames from a faster detector to be zipped with a single frame from a slow detector e.g. if num=10 and multiplier=5 then the detector will take 10 frames, but publish 2 indices, and describe() will show a shape of (5, h, w)
- class ophyd_async.core.DetectorTrigger(*args, **kwds)[source]#
Bases:
enum.Enum
Type of mechanism for triggering a detector to take frames.
- INTERNAL#
‘INTERNAL’
Detector generates internal trigger for given rate
- EDGE_TRIGGER#
‘EDGE_TRIGGER’
Expect a series of arbitrary length trigger signals
- CONSTANT_GATE#
‘CONSTANT_GATE’
Expect a series of constant width external gate signals
- VARIABLE_GATE#
‘VARIABLE_GATE’
Expect a series of variable width external gate signals
- class ophyd_async.core.DetectorController[source]#
Bases:
abc.ABC
Detector logic for arming and disarming the detector.
- abstractmethod get_deadtime(exposure: float | None) float [source]#
For a given exposure, how long should the time between exposures be.
- abstractmethod async prepare(trigger_info: TriggerInfo) None [source]#
Do all necessary steps to prepare the detector for triggers.
- Parameters:
trigger_info – The sort of triggers to expect.
- class ophyd_async.core.DetectorWriter[source]#
Bases:
abc.ABC
Logic for making detector write data to somewhere persistent (e.g. HDF5 file).
- abstractmethod async open(multiplier: int = 1) dict[str, DataKey] [source]#
Open writer and wait for it to be ready for data.
- Parameters:
multiplier – Each StreamDatum index corresponds to this many written exposures
- Returns:
Output for
describe()
- abstractmethod observe_indices_written(timeout=DEFAULT_TIMEOUT) AsyncGenerator[int, None] [source]#
Yield the index of each frame (or equivalent data point) as it is written.
- abstractmethod collect_stream_docs(indices_written: int) AsyncIterator[StreamAsset] [source]#
Create Stream docs up to given number written.
- class ophyd_async.core.PathInfo[source]#
Information about where and how to write a file.
- Parameters:
directory_path – Directory into which files should be written
filename – Base filename to use generated by FilenameProvider, w/o extension
create_dir_depth – Optional depth of directories to create if they do not exist
- class ophyd_async.core.PathProvider[source]#
Bases:
typing.Protocol
Abstract class that tells a detector where to write its data.
- class ophyd_async.core.StaticPathProvider(filename_provider: FilenameProvider, directory_path: Path | str, create_dir_depth: int = 0)[source]#
Bases:
ophyd_async.core._providers.PathProvider
All files will be within a static directory.
- class ophyd_async.core.AutoIncrementingPathProvider(filename_provider: FilenameProvider, base_directory_path: Path, create_dir_depth: int = 0, max_digits: int = 5, starting_value: int = 0, num_calls_per_inc: int = 1, increment: int = 1, inc_delimeter: str = '_', base_name: str | None = None)[source]#
Bases:
ophyd_async.core._providers.PathProvider
Provides a new numerically incremented path on each call.
- class ophyd_async.core.YMDPathProvider(filename_provider: FilenameProvider, base_directory_path: Path, create_dir_depth: int = -3, device_name_as_base_dir: bool = False)[source]#
Bases:
ophyd_async.core._providers.PathProvider
Provides a path with the date included in the directory name.
- class ophyd_async.core.FilenameProvider[source]#
Bases:
typing.Protocol
Base class for callable classes providing filenames.
- class ophyd_async.core.StaticFilenameProvider(filename: str)[source]#
Bases:
ophyd_async.core._providers.FilenameProvider
Provides a constant filename on every call.
- class ophyd_async.core.AutoIncrementFilenameProvider(base_filename: str = '', max_digits: int = 5, starting_value: int = 0, increment: int = 1, inc_delimeter: str = '_')[source]#
Bases:
ophyd_async.core._providers.FilenameProvider
Provides a new numerically incremented filename on each call.
- class ophyd_async.core.UUIDFilenameProvider(uuid_call_func: Callable = uuid.uuid4, uuid_call_args: list | None = None)[source]#
Bases:
ophyd_async.core._providers.FilenameProvider
Files will have a UUID as a filename.
- class ophyd_async.core.NameProvider[source]#
Bases:
typing.Protocol
Base class for callable classes providing data keys.
- class ophyd_async.core.DatasetDescriber[source]#
Bases:
typing.Protocol
For describing datasets in file writing.
- class ophyd_async.core.HDFFile(full_file_name: Path, datasets: list[HDFDataset], hostname: str = 'localhost')[source]#
TODO.
- Parameters:
full_file_name – Absolute path to the file to be written
datasets – Datasets to write into the file
- stream_resources() Iterator[StreamResource] [source]#
- stream_data(indices_written: int) Iterator[StreamDatum] [source]#
- class ophyd_async.core.StandardFlyer(trigger_logic: FlyerController[T], name: str = '')[source]#
Bases:
ophyd_async.core._device.Device
,bluesky.protocols.Stageable
,bluesky.protocols.Preparable
,bluesky.protocols.Flyable
,typing.Generic
[ophyd_async.core._utils.T
]Base class for ‘flyable’ devices.
- property trigger_logic: FlyerController[T]#
- prepare(value: T) AsyncStatus [source]#
Prepare a device for scanning.
This method provides similar functionality to
Stageable.stage
andMovable.set
, with key differences:Stageable.stage
^^^^^^^^^^^^^^^^^^^ Staging a device translates to, “I’m going to use this in a scan, but I’m not sure how”. Preparing it translates to, “I’m about to do a step or a fly scan with these parameters”. Staging should be universal across many different types of scans, however prepare is specific to an input value passed in.Movable.set
^^^^^^^^^^^^^^^ For some devices, preparation for a scan could involve multiple soft or hardware signals being configured and/or set.prepare
therefore allows these to be bundled together, along with other logic.For example, a Flyable device should have the following methods called on it to perform a fly-scan:
prepare(flyscan_params) kickoff() complete()
If the device is a detector,
collect_asset_docs
can be called repeatedly whilecomplete
is not done to publish frames. Alternatively, to step-scan a detector,prepare(frame_params) to setup N software triggered frames trigger() to take N frames collect_asset_docs() to publish N frames
Returns a Status that is marked done when the device is ready for a scan.
- class ophyd_async.core.FlyerController[source]#
Bases:
abc.ABC
,typing.Generic
[ophyd_async.core._utils.T
]Base class for controlling ‘flyable’ devices.
- class ophyd_async.core.Settings(device: DeviceT, settings: MutableMapping[SignalRW, Any] | None = None)[source]#
Bases:
collections.abc.MutableMapping
[ophyd_async.core._signal.SignalRW
[typing.Any
],typing.Any
],typing.Generic
[ophyd_async.core._device.DeviceT
]Used for supplying settings to signals.
- Parameters:
device – The device that the settings are for.
settings – A dictionary of settings to start with.
- Example:
# Settings are created from a dict of signals to values settings1 = Settings(device, {device.sig1: 1, device.sig2: 2}) settings2 = Settings(device, {device.sig1: 10, device.sig3: 3}) # They act like a dictionaries assert settings1[device.sig1] == 1 # Including the ability to "or" two settings together settings = settings1 | settings2 assert dict(settings) == { device.sig1: 10, device.sig2: 2, device.sig3: 3, }
- partition(predicate: Callable[[SignalRW], bool]) tuple[Settings[DeviceT], Settings[DeviceT]] [source]#
Partition into two Settings based on a predicate.
- Parameters:
predicate – Callable that takes each signal, and returns a boolean to say if it should be in the first returned Settings
- Returns:
(where_true, where_false)
where each is a Settings object. The first contains the signals for which the predicate returned True, and the second contains the signals for which the predicate returned False.- Example:
settings = Settings(device, {device.special: 1, device.sig: 2}) specials, others = settings.partition(lambda sig: "special" in sig.name)
- class ophyd_async.core.SettingsProvider[source]#
Base class for providing settings.
- class ophyd_async.core.YamlSettingsProvider(directory: Path | str)[source]#
Bases:
ophyd_async.core._settings.SettingsProvider
For providing settings from yaml to signals.
- ophyd_async.core.config_ophyd_async_logging(file=sys.stdout, fmt=DEFAULT_FORMAT, datefmt=DEFAULT_DATE_FORMAT, color=True, level='WARNING')[source]#
Set a new handler on the
logging.getLogger('ophyd_async')
logger.If this is called more than once, the handler from the previous invocation is removed (if still present) and replaced.
- Parameters:
file – object with
write
method or filename string. Default issys.stdout
.fmt – str Overall logging format
datefmt – str Date format. Default is
'%H:%M:%S'
.color – bool Use ANSI color codes. True by default.
level – str or int Python logging level, given as string or corresponding integer. Default is ‘WARNING’.
- Returns:
The handler, which has already been added to the ‘ophyd_async’ logger.
- Examples:
Log to a file.
config_ophyd_async_logging(file='/tmp/what_is_happening.txt')
Include the date along with the time. (The log messages will always include microseconds, which are configured separately, not as part of ‘datefmt’.)
config_ophyd_async_logging(datefmt="%Y-%m-%d %H:%M:%S")
Turn off ANSI color codes.
config_ophyd_async_logging(color=False)
Increase verbosity: show level DEBUG or higher.
config_ophyd_async_logging(level='DEBUG')
- ophyd_async.core.CALCULATE_TIMEOUT#
‘CALCULATE_TIMEOUT’
Sentinel used to implement
myfunc(timeout=CalculateTimeout)
This signifies that the function should calculate a suitable non-zero timeout itself
- ophyd_async.core.CalculatableTimeout#
None
- ophyd_async.core.DEFAULT_TIMEOUT#
10.0
- ophyd_async.core.Callback#
None
- exception ophyd_async.core.NotConnected(errors: str | Mapping[str, Exception])[source]#
Bases:
Exception
Exception to be raised if a
Device.connect
is cancelled.- Parameters:
errors – Mapping of device name to Exception or another NotConnected. Alternatively a string with the signal error text.
- class ophyd_async.core.Reference(obj: T)[source]#
Bases:
typing.Generic
[ophyd_async.core._utils.T
]Hide an object behind a reference.
Used to opt out of the naming/parent-child relationship of
Device
.- Example:
class DeviceWithRefToSignal(Device): def __init__(self, signal: SignalRW[int]): self.signal_ref = Reference(signal) super().__init__() def set(self, value) -> AsyncStatus: return self.signal_ref().set(value + 1)
- async ophyd_async.core.gather_dict(coros: dict[T, Awaitable[V]]) dict[T, V] [source]#
Take named coros and return a dict of their name to their return value.
- ophyd_async.core.get_dtype(datatype: type) dtype [source]#
Get the runtime dtype from a numpy ndarray type annotation.
>>> from ophyd_async.core import Array1D >>> import numpy as np >>> get_dtype(Array1D[np.int8]) dtype('int8')
- ophyd_async.core.get_enum_cls(datatype: type | None) type[StrictEnum] | None [source]#
Get the enum class from a datatype.
- Raises:
TypeError – if type is not a
StrictEnum
orSubsetEnum
subclass
>>> from ophyd_async.core import StrictEnum >>> from collections.abc import Sequence >>> class MyEnum(StrictEnum): ... A = "A value" >>> get_enum_cls(str) >>> get_enum_cls(MyEnum) <enum 'MyEnum'> >>> get_enum_cls(Sequence[MyEnum]) <enum 'MyEnum'>
- ophyd_async.core.get_unique(values: dict[str, T], types: str) T [source]#
If all values are the same, return that value, otherwise raise TypeError.
>>> get_unique({"a": 1, "b": 1}, "integers") 1 >>> get_unique({"a": 1, "b": 2}, "integers") Traceback (most recent call last): ... TypeError: Differing integers: a has 1, b has 2
- ophyd_async.core.in_micros(t: float) int [source]#
Convert between a seconds and microseconds.
- Parameters:
t – A time in seconds
- Returns:
A time in microseconds, rounded up to the nearest whole microsecond
- Raises:
ValueError – if t < 0
- ophyd_async.core.make_datakey(datatype: type[SignalDatatypeT], value: SignalDatatypeT, source: str, metadata: SignalMetadata) DataKey [source]#
Make a DataKey for a given datatype.
- async ophyd_async.core.wait_for_connection(**coros: Awaitable[None])[source]#
Call many underlying signals, accumulating exceptions and returning them.
Expected kwargs should be a mapping of names to coroutine tasks to execute.
- ophyd_async.core.ConfigSignal#
‘_compat_format(…)’