Note
Ophyd async is included on a provisional basis until the v1.0 release and may change API on minor release numbers before then
Write Tests for Devices#
Testing ophyd-async devices using tools like mocking, patching, and fixtures can become complicated very quickly. The library provides several utilities to make it easier.
Async Tests#
pytest-asyncio is required for async tests. It is should be included as a dev dependency of your project. Tests can either be decorated with @pytest.mark.asyncio
or the project can be automatically configured to detect async tests.
# pyproject.toml
[tool.pytest.ini_options]
...
asyncio_mode = "auto"
Mock Backend#
Ophyd devices initialized with a mock backend behave in a similar way to mocks, without requiring you to mock out all the dependencies and internals. The init_devices
can initialize any number of devices, and their signals and sub-devices (recursively), with a mock backend.
@pytest.fixture
async def mock_sensor() -> sim.Sensor:
async with init_devices(mock=True):
mock_sensor = sim.Sensor("MOCK:SENSOR:")
# Signals connected here
assert mock_sensor.name == "mock_sensor"
return mock_sensor
Mock Utility Functions#
Mock signals behave as simply as possible, holding a sensible default value when initialized and retaining any value (in memory) to which they are set. This model breaks down in the case of read-only signals, which cannot be set because there is an expectation of some external device setting them in the real world. There is a utility function, set_mock_value
, to mock-set values for mock signals, including read-only ones.
In addition this example also utilizes helper functions like assert_reading
and assert_value
to ensure the validity of device readings and values. For more information see: API.core
async def test_sensor_reading_shows_value(mock_sensor: sim.Sensor):
# Check default value
await assert_value(mock_sensor.value, pytest.approx(0.0))
assert (await mock_sensor.value.get_value()) == pytest.approx(0.0)
await assert_reading(
mock_sensor,
{
"mock_sensor-value": {
"value": 0.0,
"alarm_severity": 0,
"timestamp": ANY,
}
},
)
# Check different value
set_mock_value(mock_sensor.value, 5.0)
await assert_reading(
mock_sensor,
{
"mock_sensor-value": {
"value": 5.0,
"timestamp": ANY,
"alarm_severity": 0,
}
},
)
Given that the mock signal holds a unittest.mock.Mock
object you can retrieve this object and assert that the device has been set correctly using get_mock_put
. You are also free to use any other behaviour that unittest.mock.Mock
provides, such as in this example which sets the parent of the mock to allow ordering across signals to be asserted:
async def test_retrieve_mock_and_assert(mock_mover: sim.Mover):
mover_setpoint_mock = get_mock_put(mock_mover.setpoint)
await mock_mover.setpoint.set(10)
mover_setpoint_mock.assert_called_once_with(10, wait=ANY)
# Assert that velocity is set before move
mover_velocity_mock = get_mock_put(mock_mover.velocity)
parent_mock = Mock()
parent_mock.attach_mock(mover_setpoint_mock, "setpoint")
parent_mock.attach_mock(mover_velocity_mock, "velocity")
await mock_mover.velocity.set(100)
await mock_mover.setpoint.set(67)
assert parent_mock.mock_calls == [
call.velocity(100, wait=True),
call.setpoint(67, wait=True),
]
There are several other test utility functions:
Use callback_on_mock_put
, for hooking in logic when a mock value changes (e.g. because someone puts to it). This can be called directly, or used as a context, with the callbacks ending after exit.
async def test_mover_stopped(mock_mover: sim.Mover):
callbacks = []
callback_on_mock_put(
mock_mover.stop_, lambda v, *args, **kwargs: callbacks.append(v)
)
await mock_mover.stop()
assert callbacks == [None]
Testing a Device in a Plan with the RunEngine#
async def test_sensor_in_plan(RE: RunEngine, mock_sensor: sim.Sensor):
"""Tests mock sensor behavior within a RunEngine plan.
This test verifies that the sensor emits the expected documents
when used in plan(count).
"""
docs = defaultdict(list)
RE.subscribe(lambda name, doc: docs[name].append(doc))
RE(bp.count([mock_sensor], num=2))
assert_emitted(docs, start=1, descriptor=1, event=2, stop=1)
This test verifies that the sim_sensor behaves as expected within a plan. The plan we use here is a count
, which takes a specified number of readings from the sim_sensor
. Since we set the repeat
to two in this test, the sensor should emit two “event” documents along with “start”, “stop” and “descriptor” documents. Finally, we use the helper function assert_emitted
to confirm that the emitted documents match our expectations.