import asyncio
from datetime import datetime, timedelta
from abc import ABCMeta, abstractmethod, abstractproperty
import operator
import threading
from functools import partial
from warnings import warn
class SuspenderBase(metaclass=ABCMeta):
"""An ABC to manage the callbacks between asyincio and pyepics.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator or generator function, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator or generator function, optional
a generator, list, or similar containing `Msg` objects
tripped_message : str, optional
Message to include in the trip notification
"""
def __init__(self, signal, *, sleep=0, pre_plan=None, post_plan=None,
tripped_message=''):
"""
"""
self.RE = None
self._ev = None
self._tripped = False
self._tripped_message = tripped_message
self._sleep = sleep
self._lock = threading.Lock()
self._sig = signal
self._pre_plan = pre_plan
self._post_plan = post_plan
def __repr__(self):
return (
"{}({!r}, sleep={}, pre_plan={}, post_plan={},"
"tripped_message={})".format(
type(self).__name__,
self._sig,
self._sleep,
self._pre_plan,
self._post_plan,
self._tripped_message,
)
)
def install(self, RE, *, event_type=None):
"""Install callback on signal
This (re)installs the required callbacks at the pyepics level
Parameters
----------
RE : RunEngine
The run engine instance this should work on
event_type : str, optional
The event type (subscription type) to watch
"""
with self._lock:
self.RE = RE
self._sig.subscribe(self, event_type=event_type, run=True)
def remove(self):
"""Disable the suspender
Removes the callback at the pyepics level
"""
self._sig.clear_sub(self)
with self._lock:
if self.RE is not None:
self.__set_event(self.RE._loop)
self.RE = None
self._tripped = False
@abstractmethod
def _should_suspend(self, value):
"""
Determine if the current value of the signal is such
that we need to tell the scan to suspend
Parameters
----------
value : object
The value to evaluate to determine if we should
suspend
Returns
-------
suspend : bool
True means suspend
"""
raise NotImplementedError()
@abstractmethod
def _should_resume(self, value):
"""
Determine if the scan is ready to automatically
restart.
Parameters
----------
value : object
The value to evaluate to determine if we should
resume
Returns
-------
suspend : bool
True means resume
"""
raise NotImplementedError()
def __call__(self, value, **kwargs):
"""Make the class callable so that we can
pass it off to the ophyd callback stack.
This expects the massive blob that comes from ophyd
"""
with self._lock:
if self.RE is None:
return
loop = self.RE._loop
if self._should_suspend(value):
self._tripped = True
# this does dirty things with internal state
if self._ev is None and self.RE is not None:
self.__make_event()
if self._ev is None:
raise RuntimeError("Could not create the ")
cb = partial(
self.RE.request_suspend,
self._ev.wait,
pre_plan=self._pre_plan,
post_plan=self._post_plan,
justification=self._get_justification(),
)
if self.RE.state.is_running:
loop.call_soon_threadsafe(cb)
elif self._should_resume(value):
self.__set_event(loop)
self._tripped = False
def __make_event(self):
"""Make or return the asyncio.Event to use as a bridge."""
assert self._lock.locked()
if self._ev is None and self.RE is not None:
th_ev = threading.Event()
def really_make_the_event():
self._ev = asyncio.Event()
th_ev.set()
h = self.RE._loop.call_soon_threadsafe(really_make_the_event)
if not th_ev.wait(0.1):
h.cancel()
return self._ev
def __set_event(self, loop):
"""Notify the event that it can resume"""
assert self._lock.locked()
if self._ev:
ev = self._ev
sleep = self._sleep
def local():
ts = (datetime.now() + timedelta(seconds=sleep)).strftime(
"%Y-%m-%d %H:%M:%S"
)
print(
"Suspender {!r} reports a return to nominal "
"conditions. Will sleep for {} seconds and then "
"release suspension at {}.".format(self, sleep, ts)
)
# we can use call_later here because this function
# is scheduled to be run in the event loop thread
# by the `call_soon_threadsafe` call just below.
loop.call_later(sleep, ev.set)
loop.call_soon_threadsafe(local)
# clear that we have an event
self._ev = None
def get_futures(self):
"""Return a list of futures to wait on.
This will only work correctly if this suspender is 'installed'
and watching a signal
Returns
-------
futs : list
List of futures to wait on
justification : str
String explaining why the suspender is tripped
"""
if not self.tripped:
return [], ""
with self._lock:
return [self.__make_event().wait], self._get_justification()
@property
def tripped(self):
return self._tripped
def _get_justification(self):
if not self.tripped:
return ''
template = 'Suspender of type {} stopped by signal {!r}'
just = template.format(self.__class__.__name__, self._sig)
return ': '.join(s for s in (just, self._tripped_message)
if s)
[docs]class SuspendBoolHigh(SuspenderBase):
"""
Suspend when a boolean signal goes high; resume when it goes low.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def _should_suspend(self, value):
return bool(value)
def _should_resume(self, value):
return not bool(value)
def _get_justification(self):
if not self.tripped:
return ''
just = 'Signal {} is high'.format(self._sig.name)
return ': '.join(s for s in (just, self._tripped_message)
if s)
[docs]class SuspendBoolLow(SuspenderBase):
"""
Suspend when a boolean signal goes low; resume when it goes high.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def _should_suspend(self, value):
return not bool(value)
def _should_resume(self, value):
return bool(value)
def _get_justification(self):
if not self.tripped:
return ''
just = 'Signal {} is low'.format(self._sig.name)
return ': '.join(s for s in (just, self._tripped_message)
if s)
class _Threshold(SuspenderBase):
"""
Private base class for suspenders that watch when a scalar
signal fall above or below a threshold. Allow for a possibly different
threshold to resume.
"""
def __init__(self, signal, suspend_thresh, *,
resume_thresh=None, **kwargs):
super().__init__(signal, **kwargs)
self._suspend_thresh = suspend_thresh
if resume_thresh is None:
resume_thresh = suspend_thresh
self._resume_thresh = resume_thresh
self._validate()
def _should_suspend(self, value):
return self._op(value, self._suspend_thresh)
def _should_resume(self, value):
return not self._op(value, self._resume_thresh)
@abstractproperty
def _op(self):
pass
@abstractmethod
def _validate(self):
pass
[docs]class SuspendFloor(_Threshold):
"""
Suspend when a scalar falls below a threshold.
Optionally, the threshold to resume can be set to be greater than the
threshold to suspend.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
suspend_thresh : float
Suspend if the signal value falls below this value
resume_thresh : float, optional
Resume when the signal value rises above this value. If not
given set to `suspend_thresh`. Must be greater than `suspend_thresh`.
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def _validate(self):
if self._resume_thresh < self._suspend_thresh:
raise ValueError("Resume threshold must be equal or greater "
"than suspend threshold, you passed: "
"suspend: {} resume: {}".format(
self._suspend_thresh,
self._resume_thresh))
@property
def _op(self):
return operator.lt
def _get_justification(self):
if not self.tripped:
return ''
just = (
f'Signal {self._sig.name} = {self._sig.get()!r} ' +
f'fell below {self._suspend_thresh} ' +
f'and has not yet crossed above {self._resume_thresh}.'
)
return ': '.join(
s for s in (just, self._tripped_message) if s
)
[docs]class SuspendCeil(_Threshold):
"""
Suspend when a scalar rises above a threshold.
Optionally, the threshold to resume can be set to be less than the
threshold to suspend.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
suspend_thresh : float
Suspend if the signal value falls below this value
resume_thresh : float, optional
Resume when the signal value rises above this value. If not
given set to `suspend_thresh`. Must be greater than `suspend_thresh`.
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def _validate(self):
if self._resume_thresh > self._suspend_thresh:
raise ValueError("Resume threshold must be equal or less "
"than suspend threshold, you passed: "
"suspend: {} resume: {}".format(
self._suspend_thresh,
self._resume_thresh))
@property
def _op(self):
return operator.gt
def _get_justification(self):
if not self.tripped:
return ''
just = (
f'Signal {self._sig.name} = {self._sig.get()!r} ' +
f'went above {self._suspend_thresh} ' +
f'and has not yet crossed below {self._resume_thresh}.'
)
return ': '.join(
s for s in (just, self._tripped_message) if s
)
class _SuspendBandBase(SuspenderBase):
"""
Private base-class for suspenders based on keeping a scalar inside
or outside of a band
"""
def __init__(self, signal, band_bottom, band_top, **kwargs):
super().__init__(signal, **kwargs)
if not band_bottom < band_top:
raise ValueError("The bottom of the band must be strictly "
"less than the top of the band.\n"
"bottom: {}\ttop: {}".format(
band_bottom, band_top)
)
self._bot = band_bottom
self._top = band_top
[docs]class SuspendWhenOutsideBand(_SuspendBandBase):
"""
Suspend when a scalar signal leaves a given band of values.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
band_bottom, band_top : float
The top and bottom of the band. `band_top` must be
strictly greater than `band_bottom`.
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def _should_resume(self, value):
return self._bot < value < self._top
def _should_suspend(self, value):
return not (self._bot < value < self._top)
def _get_justification(self):
if not self.tripped:
return ''
just = ('Signal {} = {!r} is outside of the range ({}, {})'
''.format(self._sig.name, self._sig.get(),
self._bot, self._top)
)
return ': '.join(s for s in (just, self._tripped_message)
if s)
class SuspendInBand(SuspendWhenOutsideBand):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
warn("SuspendInBand has been renamed SuspendWhenOutsideBand to make "
"its meaning more clear. Its behavior has not changed.")
class SuspendOutBand(_SuspendBandBase):
"""
Suspend when a scalar signal enters a given band of values.
This is mostly here because it is the opposite of `SuspenderInBand`.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
band_bottom, band_top : float
The top and bottom of the band. `band_top` must be
strictly greater than `band_bottom`.
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to 0
pre_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
post_plan : iterable or iterator, optional
a generator, list, or similar containing `Msg` objects
"""
def __init__(self, *args, **kwargs):
warn("bluesky.suspenders.SuspendOutBand is deprecated.")
super().__init__(*args, **kwargs)
def _should_resume(self, value):
return not (self._bot < value < self._top)
def _should_suspend(self, value):
return (self._bot < value < self._top)
def _get_justification(self):
if not self.tripped:
return ''
just = ('Signal {} = {!r} is inside of the range ({}, {})'
''.format(self._sig.name, self._sig.get(),
self._bot, self._top)
)
return ': '.join(s for s in (just, self._tripped_message)
if s)
[docs]class SuspendWhenChanged(SuspenderBase):
"""
Suspend when the monitored value deviates from the expected.
Only resume if allowed AND when monitored equals expected.
Notes
-----
This suspender is designed to require bluesky restart if value changes.
USE CASE:
:class:`~SuspendWhenChanged()` is useful when ``signal`` is an EPICS enumeration
(`"mbbo" <https://wiki-ext.aps.anl.gov/epics/index.php/RRM_3-14_Multi-Bit_Binary_Output>`_)
used with a multi-instrument facility.
Choices predefined in the mbbo record are the
names of instruments allowed to control any shared hardware.
* The ``signal``, set by instrument staff outside of bluesky,
names which instrument is allowed to control the hardware.
* Other instruments not matching ``signal`` are expected **not** to
control the hardware (they could use simulators instead or not operate
the shared hardware).
Since a decision of hardware *vs.* simulators is made at the
time a bluesky session starts and ophyd objects are first created, the
session needs to be aware immediately if the ``signal`` is changed.
The default value of ``allow_resume=False`` defends this decision.
If there is a mechanism engineered to toggle ophyd signals between
hardware and simulators, one might consider ``allow_resume=True``.
Parameters
----------
signal : `ophyd.Signal`
The signal to watch for changes to determine if the
scan should be suspended
expected_value : str, float, or int
RunEngine operations will be suspended when signal deviates
from this value. If `None` (default), set to value of
``signal`` when object is created.
allow_resume : bool
Should RunEngine be allowed to resume once ``signal.value == expected``
again? Default value of ``False`` is expected for intended use case.
sleep : float, optional
How long to wait in seconds after the resume condition is met
before marking the event as done. Defaults to ``0``.
pre_plan : iterable or callable, optional
Plan to execute just before suspending. If callable, must
take no arguments.
post_plan : iterable or callable, optional
Plan to execute just before resuming. If callable, must
take no arguments.
tripped_message : str, optional
Message to include in the trip notification
Examples
--------
.. code-block:: python
# pause if this value changes in our session
# note: this suspender is designed to require Bluesky restart if value changes
suspend_instrument_in_use = SuspendWhenChanged(instrument_in_use)
RE.install_suspender(suspend_instrument_in_use)
Example EPICS database for APS 2-BM-A and 2-BM-B:
.. code-block:: text
record(mbbo, "2bm:instrument_in_use") {
# instrument team sets this
# For additional field names, see
# https://epics.anl.gov/EpicsDocumentation/AppDevManuals/RecordRef/Recordref-25.html#HEADING25-15
field(DESC, "instrument using beam now")
field(ZRST, "none")
field(ONST, "2-BM-A")
field(TWST, "2-BM-B")
# THST
# FRST
# FVST
# ...
}
NOTE: **Always** make the zero choice (``ZRST``) in the mbbo record to be 'none'.
This allows the instrument staff to designate that *no* instrument is allowed
to control the shared hardware. Start the names of the allowed instruments
with ``ONST``.
It is convenient for the multi-instrument facility to make this definition
in EPICS rather than in a specific bluesky session. The EPICS value could be
useful in other contexts of instrument control beyond the realm of bluesky.
"""
[docs] def __init__(self, signal, *,
expected_value=None,
allow_resume=False,
sleep=0, pre_plan=None, post_plan=None, tripped_message='',
**kwargs):
self.expected_value = expected_value or signal.value
self.allow_resume = allow_resume
super().__init__(signal,
sleep=sleep,
pre_plan=pre_plan,
post_plan=post_plan,
tripped_message=tripped_message,
**kwargs)
def _should_suspend(self, value):
return value != self.expected_value
def _should_resume(self, value):
return self.allow_resume and value == self.expected_value
def _get_justification(self):
if not self.tripped:
return ''
just = (
f'Signal {self._sig.name}'
f', got "{self._sig.get()}"'
f', expected "{self.expected_value}"'
)
if not self.allow_resume:
just += '. "RE.abort()" and then restart session to use new configuration.'
return ': '.join(
s
for s in (just, self._tripped_message)
if s)