Source code for ophyd_async.testing._wait_for_pending
import asyncio
[docs]
async def wait_for_pending_wakeups(max_yields=20, raise_if_exceeded=True):
"""Allow any ready asyncio tasks to be woken up.
Used in:
- Tests to allow tasks like ``set()`` to start so that signal
puts can be tested
- `observe_value` to allow it to be wrapped in `asyncio.wait_for`
with a timeout
"""
loop = asyncio.get_event_loop()
# If anything has called loop.call_soon or is scheduled a wakeup
# then let it run
for _ in range(max_yields):
await asyncio.sleep(0)
if not loop._ready: # type: ignore # noqa: SLF001
return
if raise_if_exceeded:
raise RuntimeError(f"Tasks still scheduling wakeups after {max_yields} yields")