Source code for ophyd_async.epics.demo

"""Demo EPICS Devices for the tutorial"""

import asyncio
import atexit
import random
import string
import subprocess
import sys
import time
from enum import Enum
from pathlib import Path
from typing import Callable, List, Optional

import numpy as np
from bluesky.protocols import Movable, Stoppable

from ophyd_async.core import AsyncStatus, Device, StandardReadable, observe_value

from ..signal.signal import epics_signal_r, epics_signal_rw, epics_signal_x


[docs] class EnergyMode(str, Enum): """Energy mode for `Sensor`""" #: Low energy mode low = "Low Energy" #: High energy mode high = "High Energy"
[docs] class Sensor(StandardReadable): """A demo sensor that produces a scalar value based on X and Y Movers""" def __init__(self, prefix: str, name="") -> None: # Define some signals self.value = epics_signal_r(float, prefix + "Value") self.mode = epics_signal_rw(EnergyMode, prefix + "Mode") # Set name and signals for read() and read_configuration() self.set_readable_signals( read=[self.value], config=[self.mode], ) super().__init__(name=name)
[docs] class Mover(StandardReadable, Movable, Stoppable): """A demo movable that moves based on velocity""" def __init__(self, prefix: str, name="") -> None: # Define some signals self.setpoint = epics_signal_rw(float, prefix + "Setpoint") self.readback = epics_signal_r(float, prefix + "Readback") self.velocity = epics_signal_rw(float, prefix + "Velocity") self.units = epics_signal_r(str, prefix + "Readback.EGU") self.precision = epics_signal_r(int, prefix + "Readback.PREC") # Signals that collide with standard methods should have a trailing underscore self.stop_ = epics_signal_x(prefix + "Stop.PROC") # Whether set() should complete successfully or not self._set_success = True # Set name and signals for read() and read_configuration() self.set_readable_signals( read=[self.readback], config=[self.velocity, self.units], ) super().__init__(name=name) def set_name(self, name: str): super().set_name(name) # Readback should be named the same as its parent in read() self.readback.set_name(name) async def _move(self, new_position: float, watchers: List[Callable] = []): self._set_success = True # time.monotonic won't go backwards in case of NTP corrections start = time.monotonic() old_position, units, precision = await asyncio.gather( self.setpoint.get_value(), self.units.get_value(), self.precision.get_value(), ) # Wait for the value to set, but don't wait for put completion callback await self.setpoint.set(new_position, wait=False) async for current_position in observe_value(self.readback): for watcher in watchers: watcher( name=self.name, current=current_position, initial=old_position, target=new_position, unit=units, precision=precision, time_elapsed=time.monotonic() - start, ) if np.isclose(current_position, new_position): break if not self._set_success: raise RuntimeError("Motor was stopped")
[docs] def move(self, new_position: float, timeout: Optional[float] = None): """Commandline only synchronous move of a Motor""" from bluesky.run_engine import call_in_bluesky_event_loop, in_bluesky_event_loop if in_bluesky_event_loop(): raise RuntimeError("Will deadlock run engine if run in a plan") call_in_bluesky_event_loop(self._move(new_position), timeout) # type: ignore
# TODO: this fails if we call from the cli, but works if we "ipython await" it def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus: watchers: List[Callable] = [] coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout) return AsyncStatus(coro, watchers) async def stop(self, success=True): self._set_success = success status = self.stop_.trigger() await status
[docs] class SampleStage(Device): """A demo sample stage with X and Y movables""" def __init__(self, prefix: str, name="") -> None: # Define some child Devices self.x = Mover(prefix + "X:") self.y = Mover(prefix + "Y:") # Set name of device and child devices super().__init__(name=name)
[docs] def start_ioc_subprocess() -> str: """Start an IOC subprocess with EPICS database for sample stage and sensor with the same pv prefix """ pv_prefix = "".join(random.choice(string.ascii_uppercase) for _ in range(12)) + ":" here = Path(__file__).absolute().parent args = [sys.executable, "-m", "epicscorelibs.ioc"] args += ["-m", f"P={pv_prefix}"] args += ["-d", str(here / "sensor.db")] for suff in "XY": args += ["-m", f"P={pv_prefix}{suff}:"] args += ["-d", str(here / "mover.db")] process = subprocess.Popen( args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True, ) atexit.register(process.communicate, "exit") return pv_prefix