Note
Ophyd async is considered experimental until the v1.0 release and may change API on minor release numbers before then
ophyd_async.core
#
Members
Classes implementing this interface should hold the logic for arming and disarming a detector |
|
Type of mechanism for triggering a detector to take frames |
|
Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file). |
|
Useful detector base class for step and fly scanning detectors. |
|
Common base class for all Ophyd Async Devices. |
|
Defines how a |
|
Collector of top level Device instances to be used as a context manager |
|
Defines device components with indices. |
|
|
|
Sort all the values into a single phase so they are set all at once |
|
Get signal values in bulk. |
|
Plan which loads PVs from a yaml file into a device. |
|
Plan that returns a list of dicts with saved signal values from a yaml file. |
|
Plan that saves the state of all PV's on a device using a sorter. |
|
Plan which serialises a phase or set of phases of SignalRWs to a yaml file. |
|
Maps signals from a yaml file into device signals. |
|
Retrieve all SignalRWs from a device. |
|
|
|
|
|
|
|
Set a new handler on the |
|
Signal backend for testing, created by |
|
For setting a callback when a backend is put to. |
|
|
|
Get the mock associated with the put call on the signal. |
|
|
|
|
|
Allow or block a put with wait=True from proceeding |
|
Set the value of a signal that is in mock mode. |
|
Iterator to set a signal to a sequence of values, optionally repeating the sequence. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Information about where and how to write a file. |
|
|
|
|
|
|
|
|
|
|
|
|
|
Device that owns its children and provides useful default behavior. |
|
Declare how a |
|
A Device with the concept of a value, with R, RW, W and X flavours |
|
Signal that can be read from and monitored |
|
Signal that can be both read and set |
|
Signal that can be set |
|
Signal that puts the default value |
|
Assert readings from Configurable. |
|
Assert emitted document generated by running a Bluesky plan |
|
Assert readings from readable. |
|
Assert a signal's value and compare it an expected signal. |
|
Subscribe to the value of a signal so it can be iterated from. |
|
Set a signal and monitor it until it has that value. |
|
Set a signal and monitor another signal until it has the specified value. |
|
Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device. |
|
Creates a read-writable Signal with a SoftSignalBackend. |
|
Wait for a signal to have a matching value. |
|
A read/write/monitor backend for a Signals |
|
|
|
All members should exist in the Backend, and there will be no extras |
|
All members should exist in the Backend, but there may be extras |
|
|
|
An backend to a soft Signal, for test signals see |
|
Convert asyncio awaitable to bluesky Status interface |
|
Convert AsyncIterator of WatcherUpdates to bluesky Status interface. |
|
A lazily created Mock to be used when connecting in mock mode. |
|
Exception to be raised if a |
|
Hide an object behind a reference. |
|
A dataclass such that, when expanded, it provides the kwargs for a watcher |
|
Get the runtime dtype from a numpy ndarray type annotation |
|
Get the runtime dtype from a numpy ndarray type annotation |
|
If all values are the same, return that value, otherwise raise TypeError |
|
Converts between a positive number of seconds and an equivalent number of microseconds. |
|
Call many underlying signals, accumulating exceptions and returning them |
|
|
- class ophyd_async.core.DetectorController[source]#
Classes implementing this interface should hold the logic for arming and disarming a detector
- abstract get_deadtime(exposure: float | None) float [source]#
For a given exposure, how long should the time between exposures be
- abstract async prepare(trigger_info: TriggerInfo)[source]#
Do all necessary steps to prepare the detector for triggers.
- Args:
- trigger_info: This is a Pydantic model which contains
number Expected number of frames. trigger Type of trigger for which to prepare the detector. Defaults to DetectorTrigger.internal. livetime Livetime / Exposure time with which to set up the detector. Defaults to None if not applicable or the detector is expected to use its previously-set exposure time. deadtime Defaults to None. This is the minimum deadtime between triggers. multiplier The number of triggers grouped into a single StreamDatum index.
- class ophyd_async.core.DetectorTrigger(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Type of mechanism for triggering a detector to take frames
- internal = 'internal'#
Detector generates internal trigger for given rate
- edge_trigger = 'edge_trigger'#
Expect a series of arbitrary length trigger signals
- constant_gate = 'constant_gate'#
Expect a series of constant width external gate signals
- variable_gate = 'variable_gate'#
Expect a series of variable width external gate signals
- class ophyd_async.core.DetectorWriter[source]#
Logic for making a detector write data to somewhere persistent (e.g. an HDF5 file)
- abstract async open(multiplier: int = 1) dict[str, DataKey] [source]#
Open writer and wait for it to be ready for data.
- Args:
- multiplier: Each StreamDatum index corresponds to this many
written exposures
- Returns:
Output for
describe()
- abstract observe_indices_written(timeout=10.0) AsyncGenerator[int, None] [source]#
Yield the index of each frame (or equivalent data point) as it is written
- abstract collect_stream_docs(indices_written: int) AsyncIterator[Tuple[Literal['stream_resource'], StreamResource] | Tuple[Literal['stream_datum'], StreamDatum]] [source]#
Create Stream docs up to given number written
- class ophyd_async.core.StandardDetector(controller: DetectorController, writer: DetectorWriter, config_sigs: Sequence[SignalR] = (), name: str = '', connector: DeviceConnector | None = None)[source]#
Useful detector base class for step and fly scanning detectors. Aggregates controller and writer logic together.
- prepare(value: TriggerInfo) None [source]#
Arm detector.
Prepare the detector with trigger information. This is determined at and passed in from the plan level.
This currently only prepares detectors for flyscans and stepscans just use the trigger information determined in trigger.
To do: Unify prepare to be use for both fly and step scans.
- Args:
value: TriggerInfo describing how to trigger the detector
- pydantic model ophyd_async.core.TriggerInfo[source]#
Minimal set of information required to setup triggering on a detector
Show JSON schema
{ "title": "TriggerInfo", "description": "Minimal set of information required to setup triggering on a detector", "type": "object", "properties": { "number_of_triggers": { "anyOf": [ { "minimum": 0, "type": "integer" }, { "items": { "minimum": 0, "type": "integer" }, "type": "array" } ], "title": "Number Of Triggers" }, "trigger": { "$ref": "#/$defs/DetectorTrigger", "default": "internal" }, "deadtime": { "anyOf": [ { "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "title": "Deadtime" }, "livetime": { "anyOf": [ { "minimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "title": "Livetime" }, "frame_timeout": { "anyOf": [ { "exclusiveMinimum": 0.0, "type": "number" }, { "type": "null" } ], "default": null, "title": "Frame Timeout" }, "multiplier": { "default": 1, "title": "Multiplier", "type": "integer" } }, "$defs": { "DetectorTrigger": { "description": "Type of mechanism for triggering a detector to take frames", "enum": [ "internal", "edge_trigger", "constant_gate", "variable_gate" ], "title": "DetectorTrigger", "type": "string" } }, "required": [ "number_of_triggers" ] }
- Fields:
number_of_triggers (int | list[int])
trigger (ophyd_async.core._detector.DetectorTrigger)
deadtime (float | None)
livetime (float | None)
frame_timeout (float | None)
multiplier (int)
- field deadtime: float | None = None#
What is the minimum deadtime between triggers
- Constraints:
ge = 0
- field frame_timeout: float | None = None#
What is the maximum timeout on waiting for a frame
- Constraints:
gt = 0
- field livetime: float | None = None#
What is the maximum high time of the triggers
- Constraints:
ge = 0
- field multiplier: int = 1#
How many triggers make up a single StreamDatum index, to allow multiple frames from a faster detector to be zipped with a single frame from a slow detector e.g. if num=10 and multiplier=5 then the detector will take 10 frames, but publish 2 indices, and describe() will show a shape of (5, h, w)
- field number_of_triggers: int | list[int] [Required]#
This would trigger: - 2 times for dark field images - 3 times for initial flat field images - 100 times for projections - 3 times for final flat field images
- field trigger: DetectorTrigger = DetectorTrigger.internal#
Sort of triggers that will be sent
- class ophyd_async.core.Device(name: str = '', connector: DeviceConnector | None = None)[source]#
Common base class for all Ophyd Async Devices.
- set_name(name: str)[source]#
Set
self.name=name
and eachself.child.name=name+"-child"
.- Parameters:
name – New name to set
- async connect(mock: bool | LazyMock = False, timeout: float = 10.0, force_reconnect: bool = False) None [source]#
Connect self and all child Devices.
Contains a timeout that gets propagated to child.connect methods.
- Parameters:
mock – If True then use
MockSignalBackend
for all Signalstimeout – Time to wait before failing with a TimeoutError.
- class ophyd_async.core.DeviceConnector[source]#
Defines how a
Device
should be connected and type hints processed.- create_children_from_annotations(device: Device)[source]#
Used when children can be created from introspecting the hardware.
Some control systems allow introspection of a device to determine what children it has. To allow this to work nicely with typing we add these hints to the Device like so:
my_signal: SignalRW[int] my_device: MyDevice
This method will be run during
Device.__init__
, and is responsible for turning all of those type hints into real Signal and Device instances.Subsequent runs of this function should do nothing, to allow it to be called early in Devices that need to pass references to their children during
__init__
.
- class ophyd_async.core.DeviceCollector(set_name=True, connect=True, mock=False, timeout: float = 10.0)[source]#
Collector of top level Device instances to be used as a context manager
- Parameters:
set_name – If True, call
device.set_name(variable_name)
on all collected Devicesconnect – If True, call
device.connect(mock)
in parallel on all collected Devicesmock – If True, connect Signals in simulation mode
timeout – How long to wait for connect before logging an exception
Notes
Example usage:
[async] with DeviceCollector(): t1x = motor.Motor("BLxxI-MO-TABLE-01:X") t1y = motor.Motor("pva://BLxxI-MO-TABLE-01:Y") # Names and connects devices here assert t1x.comm.velocity.source assert t1x.name == "t1x"
- class ophyd_async.core.DeviceVector(children: Mapping[int, DeviceT], name: str = '')[source]#
Defines device components with indices.
In the below example, foos becomes a dictionary on the parent device at runtime, so parent.foos[2] returns a FooDevice. For example usage see
DynamicSensorGroup
- ophyd_async.core.all_at_once(values: dict[str, Any]) Sequence[dict[str, Any]] [source]#
Sort all the values into a single phase so they are set all at once
- ophyd_async.core.get_signal_values(signals: dict[str, SignalRW[Any]], ignore: list[str] | None = None) Generator[Msg, Sequence[Location[Any]], dict[str, Any]] [source]#
Get signal values in bulk.
Used as part of saving the signals of a device to a yaml file.
- Parameters:
signals (Dict[str, SignalRW]) – Dictionary with pv names and matching SignalRW values. Often the direct result of
walk_rw_signals()
.ignore (Optional[List[str]]) – Optional list of PVs that should be ignored.
- Returns:
A dictionary containing pv names and their associated values. Ignored pvs are set to None.
- Return type:
Dict[str, Any]
- ophyd_async.core.load_device(device: Device, path: str)[source]#
Plan which loads PVs from a yaml file into a device.
- Parameters:
See also
- ophyd_async.core.load_from_yaml(save_path: str) Sequence[dict[str, Any]] [source]#
Plan that returns a list of dicts with saved signal values from a yaml file.
- Parameters:
save_path (str) – Path of the yaml file to load from
- ophyd_async.core.save_device(device: ~ophyd_async.core._device.Device, path: str, sorter: ~collections.abc.Callable[[dict[str, ~typing.Any]], ~collections.abc.Sequence[dict[str, ~typing.Any]]] = <function all_at_once>, ignore: list[str] | None = None)[source]#
Plan that saves the state of all PV’s on a device using a sorter.
The default sorter assumes all saved PVs can be loaded at once, and therefore can be saved at one time, i.e. all PVs will appear on one list in the resulting yaml file.
This can be a problem, because when the yaml is ingested with
ophyd_async.core.load_device()
, it will set all of those PVs at once. However, some PV’s need to be set before others - this is device specific.Therefore, users should consider the order of device loading and write their own sorter algorithms accordingly.
See
ophyd_async.fastcs.panda.phase_sorter()
for a valid implementation of the sorter.- Parameters:
See also
- ophyd_async.core.save_to_yaml(phases: Sequence[dict[str, Any]], save_path: str | Path) None [source]#
Plan which serialises a phase or set of phases of SignalRWs to a yaml file.
- Parameters:
phases (dict or list of dicts) – The values to save. Each item in the list is a seperate phase used when loading a device. In general this variable be the return value of
get_signal_values
.save_path (str) – Path of the yaml file to write to
- ophyd_async.core.set_signal_values(signals: dict[str, SignalRW[Any]], values: Sequence[dict[str, Any]]) Generator[Msg, None, None] [source]#
Maps signals from a yaml file into device signals.
values
contains signal values in phases, which are loaded in sequentially into the provided signals, to ensure signals are set in the correct order.- Parameters:
signals (Dict[str, SignalRW[Any]]) – Dictionary of named signals to be updated if value found in values argument. Can be the output of
walk_rw_signals()
for a device.values (Sequence[Dict[str, Any]]) – List of dictionaries of signal name and value pairs, if a signal matches the name of one in the signals argument, sets the signal to that value. The groups of signals are loaded in their list order. Can be the output of
load_from_yaml()
for a yaml file.
- ophyd_async.core.walk_rw_signals(device: Device, path_prefix: str | None = '') dict[str, SignalRW[Any]] [source]#
Retrieve all SignalRWs from a device.
Stores retrieved signals with their dotted attribute paths in a dictionary. Used as part of saving and loading a device.
- Parameters:
- Returns:
SignalRWs (dict) – A dictionary matching the string attribute path of a SignalRW with the signal itself.
See Also
——–
- class ophyd_async.core.HDFDataset(data_key: str, dataset: str, shape: collections.abc.Sequence[int] = <factory>, dtype_numpy: str = '', multiplier: int = 1, swmr: bool = False, chunk_shape: tuple[int, ...] = ())[source]#
- class ophyd_async.core.HDFFile(full_file_name: Path, datasets: list[HDFDataset], hostname: str = 'localhost')[source]#
- Parameters:
full_file_name – Absolute path to the file to be written
datasets – Datasets to write into the file
- ophyd_async.core.config_ophyd_async_logging(file=<colorama.ansitowin32.StreamWrapper object>, fmt='%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs)03d %(module)s:%(lineno)d] %(message)s', datefmt='%y%m%d %H:%M:%S', color=True, level='WARNING')[source]#
Set a new handler on the
logging.getLogger('ophyd_async')
logger. If this is called more than once, the handler from the previous invocation is removed (if still present) and replaced.- Parameters:
file (object with
write
method or filename string) – Default issys.stdout
.fmt (Overall logging format)
datefmt (string) – Date format. Default is
'%H:%M:%S'
.color (boolean) – Use ANSI color codes. True by default.
level (str or int) – Python logging level, given as string or corresponding integer. Default is ‘WARNING’.
- Returns:
handler – The handler, which has already been added to the ‘ophyd_async’ logger.
- Return type:
Examples
Log to a file.
config_ophyd_async_logging(file=’/tmp/what_is_happening.txt’)
Include the date along with the time. (The log messages will always include microseconds, which are configured separately, not as part of ‘datefmt’.)
config_ophyd_async_logging(datefmt=”%Y-%m-%d %H:%M:%S”)
Turn off ANSI color codes.
config_ophyd_async_logging(color=False)
Increase verbosity: show level DEBUG or higher.
config_ophyd_async_logging(level=’DEBUG’)
- class ophyd_async.core.MockSignalBackend(initial_backend: SignalBackend[SignalDatatypeT], mock: LazyMock)[source]#
Signal backend for testing, created by
Device.connect(mock=True)
.
- ophyd_async.core.callback_on_mock_put(signal: Signal[SignalDatatypeT], callback: Callable[[SignalDatatypeT, bool], None] | Callable[[SignalDatatypeT, bool], Awaitable[None]])[source]#
For setting a callback when a backend is put to.
Can either be used in a context, with the callback being unset on exit, or as an ordinary function.
- Parameters:
signal – A signal with a
MockSignalBackend
backend.callback – The callback to call when the backend is put to during the context.
- ophyd_async.core.get_mock_put(signal: Signal) AsyncMock [source]#
Get the mock associated with the put call on the signal.
- ophyd_async.core.set_mock_put_proceeds(signal: Signal, proceeds: bool)[source]#
Allow or block a put with wait=True from proceeding
- ophyd_async.core.set_mock_value(signal: Signal[SignalDatatypeT], value: SignalDatatypeT)[source]#
Set the value of a signal that is in mock mode.
- ophyd_async.core.set_mock_values(signal: SignalR[SignalDatatypeT], values: Iterable[SignalDatatypeT], require_all_consumed: bool = False) _SetValuesIterator [source]#
Iterator to set a signal to a sequence of values, optionally repeating the sequence.
- Parameters:
signal – A signal with a
MockSignalBackend
backend.values – An iterable of the values to set the signal to, on each iteration the value will be set.
require_all_consumed – If True, an AssertionError will be raised if the iterator is deleted before all values have been consumed.
Notes
Example usage:
for value_set in set_mock_values(signal, [1, 2, 3]): # do something cm = set_mock_values(signal, 1, 2, 3, require_all_consumed=True): next(cm) # do something
- class ophyd_async.core.PathInfo(directory_path: Path, filename: str, create_dir_depth: int = 0)[source]#
Information about where and how to write a file.
- Parameters:
directory_path – Directory into which files should be written
filename – Base filename to use generated by FilenameProvider, w/o extension
create_dir_depth – Optional depth of directories to create if they do not exist
- class ophyd_async.core.StandardReadable(name: str = '', connector: DeviceConnector | None = None)[source]#
Device that owns its children and provides useful default behavior.
When its name is set it renames child Devices
Signals can be registered for read() and read_configuration()
These signals will be subscribed for read() between stage() and unstage()
- add_children_as_readables(format: StandardReadableFormat = StandardReadableFormat.CHILD) Generator[None, None, None] [source]#
Context manager that calls
add_readables
on child Devices added within.Scans
self.children()
on entry and exit to context manager, and callsadd_readables
on any that are added with the providedStandardReadableFormat
.
- add_readables(devices: Sequence[Device], format: StandardReadableFormat = StandardReadableFormat.CHILD) None [source]#
Add devices to contribute to various bluesky verbs.
Use output from the given devices to contribute to the verbs of the following interfaces:
- Parameters:
devices – The devices to be added
format – Determines which of the devices functions are added to which verb as per the
StandardReadableFormat
documentation
- class ophyd_async.core.StandardReadableFormat(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Declare how a
Device
should contribute to theStandardReadable
verbs.- CHILD = 'CHILD'#
Detect which verbs the child supports and contribute to:
read()
,describe()
if it isbluesky.protocols.Readable
read_configuration()
,describe_configuration()
if it isbluesky.protocols.Configurable
stage()
,unstage()
if it isbluesky.protocols.Stageable
hints
if itbluesky.protocols.HasHints
- CONFIG_SIGNAL = 'CONFIG_SIGNAL'#
Contribute the
Signal
value toread_configuration()
anddescribe_configuration()
- class ophyd_async.core.Signal(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#
A Device with the concept of a value, with R, RW, W and X flavours
- class ophyd_async.core.SignalR(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#
Signal that can be read from and monitored
- async read(cached: bool | None = None) dict[str, Reading] [source]#
Return a single item dict with the reading in it
- subscribe_value(function: Callable[[SignalDatatypeT], None])[source]#
Subscribe to updates in value of a device
- class ophyd_async.core.SignalRW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#
Signal that can be both read and set
- class ophyd_async.core.SignalW(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#
Signal that can be set
- class ophyd_async.core.SignalX(backend: SignalBackend[SignalDatatypeT], timeout: float | None = 10.0, name: str = '')[source]#
Signal that puts the default value
- async ophyd_async.core.assert_configuration(configurable: AsyncConfigurable, configuration: Mapping[str, Reading]) None [source]#
Assert readings from Configurable.
- Parameters:
configurable – Configurable with Configurable.read function that generate readings.
configuration – The expected readings from configurable.
Notes
- Example usage::
await assert_configuration(configurable configuration)
- ophyd_async.core.assert_emitted(docs: Mapping[str, list[dict]], **numbers: int)[source]#
Assert emitted document generated by running a Bluesky plan
- Parameters:
Doc – A dictionary
numbers – expected emission in kwarg from
Notes
- Example usage::
assert_emitted(docs, start=1, descriptor=1, resource=1, datum=1, event=1, stop=1)
- async ophyd_async.core.assert_reading(readable: AsyncReadable, expected_reading: Mapping[str, Reading]) None [source]#
Assert readings from readable.
- Parameters:
readable – Callable with readable.read function that generate readings.
reading – The expected readings from the readable.
Notes
- Example usage::
await assert_reading(readable, reading)
- async ophyd_async.core.assert_value(signal: SignalR[SignalDatatypeT], value: Any) None [source]#
Assert a signal’s value and compare it an expected signal.
- Parameters:
signal – signal with get_value.
value – The expected value from the signal.
Notes
- Example usage::
await assert_value(signal, value)
- async ophyd_async.core.observe_value(signal: SignalR[SignalDatatypeT], timeout: float | None = None, done_status: Status | None = None) AsyncGenerator[SignalDatatypeT, None] [source]#
Subscribe to the value of a signal so it can be iterated from.
- Parameters:
signal – Call subscribe_value on this at the start, and clear_sub on it at the end
timeout – If given, how long to wait for each updated value in seconds. If an update is not produced in this time then raise asyncio.TimeoutError
done_status – If this status is complete, stop observing and make the iterator return. If it raises an exception then this exception will be raised by the iterator.
Notes
Example usage:
async for value in observe_value(sig): do_something_with(value)
- async ophyd_async.core.set_and_wait_for_value(signal: SignalRW[SignalDatatypeT], value: SignalDatatypeT, timeout: float = 10.0, status_timeout: float | None = None) AsyncStatus [source]#
Set a signal and monitor it until it has that value.
- Useful for busy record, or other Signals with pattern:
Set Signal with wait=True and stash the Status
Read the same Signal to check the operation has started
Return the Status so calling code can wait for operation to complete
- Parameters:
signal – The signal to set
value – The value to set it to
timeout – How long to wait for the signal to have the value
status_timeout – How long the returned Status will wait for the set to complete
Notes
Example usage:
set_and_wait_for_value(device.acquire, 1)
- async ophyd_async.core.set_and_wait_for_other_value(set_signal: SignalW[SignalDatatypeT], set_value: SignalDatatypeT, read_signal: SignalR[SignalDatatypeV], read_value: SignalDatatypeV, timeout: float = 10.0, set_timeout: float | None = None) AsyncStatus [source]#
Set a signal and monitor another signal until it has the specified value.
This function sets a set_signal to a specified set_value and waits for a read_signal to have the read_value.
- Parameters:
signal – The signal to set
set_value – The value to set it to
read_signal – The signal to monitor
read_value – The value to wait for
timeout – How long to wait for the signal to have the value
set_timeout – How long to wait for the set to complete
Notes
Example usage:
set_and_wait_for_value(device.acquire, 1, device.acquire_rbv, 1)
- ophyd_async.core.soft_signal_r_and_setter(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) tuple[SignalR[SignalDatatypeT], Callable[[SignalDatatypeT], None]] [source]#
Returns a tuple of a read-only Signal and a callable through which the signal can be internally modified within the device. May pass metadata, which are propagated into describe. Use soft_signal_rw if you want a device that is externally modifiable
- ophyd_async.core.soft_signal_rw(datatype: type[SignalDatatypeT], initial_value: SignalDatatypeT | None = None, name: str = '', units: str | None = None, precision: int | None = None) SignalRW[SignalDatatypeT] [source]#
Creates a read-writable Signal with a SoftSignalBackend. May pass metadata, which are propagated into describe.
- async ophyd_async.core.wait_for_value(signal: SignalR[SignalDatatypeT], match: SignalDatatypeT | Callable[[SignalDatatypeT], bool], timeout: float | None)[source]#
Wait for a signal to have a matching value.
- Parameters:
signal – Call subscribe_value on this at the start, and clear_sub on it at the end
match – If a callable, it should return True if the value matches. If not callable then value will be checked for equality with match.
timeout – How long to wait for the value to match
Notes
Example usage:
wait_for_value(device.acquiring, 1, timeout=1)
Or:
wait_for_value(device.num_captured, lambda v: v > 45, timeout=1)
- class ophyd_async.core.SignalBackend(datatype: type[SignalDatatypeT] | None)[source]#
A read/write/monitor backend for a Signals
- abstract source(name: str, read: bool) str [source]#
Return source of signal.
Signals may pass a name to the backend, which can be used or discarded.
- abstract async put(value: SignalDatatypeT | None, wait: bool)[source]#
Put a value to the PV, if wait then wait for completion
- abstract async get_datakey(source: str) DataKey [source]#
Metadata like source, dtype, shape, precision, units
- abstract async get_reading() Reading[SignalDatatypeT] [source]#
The current value, timestamp and severity
- class ophyd_async.core.StrictEnum(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
All members should exist in the Backend, and there will be no extras
- class ophyd_async.core.SubsetEnum(value, *args, **kwargs)[source]#
All members should exist in the Backend, but there may be extras
- class ophyd_async.core.SoftSignalBackend(datatype: type[SignalDatatypeT] | None, initial_value: SignalDatatypeT | None = None, units: str | None = None, precision: int | None = None)[source]#
An backend to a soft Signal, for test signals see
MockSignalBackend
.
- class ophyd_async.core.AsyncStatus(awaitable: Coroutine | Task, name: str | None = None)[source]#
Convert asyncio awaitable to bluesky Status interface
- class ophyd_async.core.WatchableAsyncStatus(iterator: AsyncIterator[WatcherUpdate[T]], name: str | None = None)[source]#
Convert AsyncIterator of WatcherUpdates to bluesky Status interface.
- classmethod wrap(f: Callable[[P], AsyncIterator[WatcherUpdate[T]]]) Callable[[P], WAS] [source]#
Wrap an AsyncIterator in a WatchableAsyncStatus.
- class ophyd_async.core.LazyMock(name: str = '', parent: LazyMock | None = None)[source]#
A lazily created Mock to be used when connecting in mock mode.
Creating Mocks is reasonably expensive when each Device (and Signal) requires its own, and the tree is only used when
Signal.set()
is called. This class allows a tree of lazily connected Mocks to be constructed so that when the leaf is created, so are its parents. Any calls to the child are then accessible from the parent mock.>>> parent = LazyMock() >>> child = parent.child("child") >>> child_mock = child() >>> child_mock() <Mock name='mock.child()' id='...'> >>> parent_mock = parent() >>> parent_mock.mock_calls [call.child()]
- exception ophyd_async.core.NotConnected(errors: str | Mapping[str, Exception])[source]#
Exception to be raised if a
Device.connect
is cancelled
- class ophyd_async.core.Reference(obj: T)[source]#
Hide an object behind a reference.
Used to opt out of the naming/parent-child relationship of
Device
.For example:
class DeviceWithRefToSignal(Device): def __init__(self, signal: SignalRW[int]): self.signal_ref = Reference(signal) super().__init__() def set(self, value) -> AsyncStatus: return self.signal_ref().set(value + 1)
- pydantic model ophyd_async.core.Table[source]#
An abstraction of a Table of str to numpy array.
Show JSON schema
{ "title": "Table", "description": "An abstraction of a Table of str to numpy array.", "type": "object", "properties": {}, "additionalProperties": true }
- Validators:
validate_array_dtypes
»all fields
validate_lengths
»all fields
- class ophyd_async.core.WatcherUpdate(current: T, initial: T, target: T, name: str | None = None, unit: str | None = None, precision: float | None = None, fraction: float | None = None, time_elapsed: float | None = None, time_remaining: float | None = None)[source]#
A dataclass such that, when expanded, it provides the kwargs for a watcher
- ophyd_async.core.get_dtype(datatype: type) dtype [source]#
Get the runtime dtype from a numpy ndarray type annotation
>>> from ophyd_async.core import Array1D >>> import numpy as np >>> get_dtype(Array1D[np.int8]) dtype('int8')
- ophyd_async.core.get_enum_cls(datatype: type | None) type[StrictEnum] | None [source]#
Get the runtime dtype from a numpy ndarray type annotation
>>> import numpy.typing as npt >>> import numpy as np >>> get_dtype(npt.NDArray[np.int8]) dtype('int8')
- ophyd_async.core.get_unique(values: dict[str, T], types: str) T [source]#
If all values are the same, return that value, otherwise raise TypeError
>>> get_unique({"a": 1, "b": 1}, "integers") 1 >>> get_unique({"a": 1, "b": 2}, "integers") Traceback (most recent call last): ... TypeError: Differing integers: a has 1, b has 2