"""
diffract
--------
Common Support for diffractometers
.. autosummary::
~Diffractometer
"""
import logging
import pint
import pyRestTable
from ophyd import Component as Cpt
from ophyd import PositionerBase
from ophyd import PseudoPositioner
from ophyd import Signal
from ophyd.pseudopos import pseudo_position_argument
from ophyd.pseudopos import real_position_argument
from ophyd.signal import ArrayAttributeSignal
from ophyd.signal import AttributeSignal
from . import __version__
from . import calc
from .util import Constraint
__all__ = """
Diffractometer
""".split()
logger = logging.getLogger(__name__)
[docs]
class Diffractometer(PseudoPositioner):
"""Diffractometer pseudopositioner
PUBLIC API
.. autosummary::
~calc
~engine
~forward
~inverse
~forward_solutions_table
~apply_constraints
~get_axis_constraints
~reset_constraints
~show_constraints
~undo_last_constraints
~pa
~wh
~geometry_table
PRIVATE API
.. autosummary::
~_calc_energy_update_permitted
~_constraints_dict
~_constraints_for_databroker
~_energy_changed
~_energy_offset_changed
~_energy_units_changed
~_push_current_constraints
~_set_constraints
~_update_calc_energy
A Diffractometer has a corresponding calculation engine from **hklpy** that does
forward and inverse calculations.
If instantiating a specific diffractometer class such as `E4C`, `E6C`,
neither the `calc_inst` or the `calc_kw` parameters are required.
However, there is the option to either pass in a calculation
instance (with `calc_inst`) or keywords for the default calculation
class (using `calc_kw`) to instantiate a new one.
Parameters
----------
prefix : str
PV prefix for all components
calc_kw : dict, optional
Initializer arguments for the calc class
decision_fcn : callable, optional
The decision function to use when multiple solutions exist for a given
forward calculation. Defaults to arbitrarily picking the first
solution.
engine : str, optional
Calculation engine name. Default: ``hkl``
read_attrs : list, optional
Read attributes default to all pseudo and real positioners
configuration_attrs : list, optional
Defaults to the UB matrix and energy
parent : Device, optional
Parent device
name : str, optional
Device name
Attributes
----------
calc_class : sub-class of CalcRecip
Reciprocal calculation class used with this diffractometer. If ``None``
(as used in `hkl.diffract.Diffractometer`), `calc_inst` must be
specified upon initialization.
max_forward_iterations : ``ophyd.Signal`` (default value: 100)
Maximum allowed number of iterations in the ``forward()``
method before failing to find a forward() solution.
See Also
--------
:class:`~hkl.geometries.ApsPolar
:class:`~hkl.geometries.E4CH`
:class:`~hkl.geometries.E4CV`
:class:`~hkl.geometries.E6C`
:class:`~hkl.geometries.K4CV`
:class:`~hkl.geometries.K6C`
:class:`~hkl.geometries.Petra3_p09_eh2`
:class:`~hkl.geometries.Petra3_p23_4c`
:class:`~hkl.geometries.Petra3_p23_6c`
:class:`~hkl.geometries.SoleilMars`
:class:`~hkl.geometries.SoleilNanoscopiumRobot`
:class:`~hkl.geometries.SoleilSiriusKappa`
:class:`~hkl.geometries.SoleilSiriusTurret`
:class:`~hkl.geometries.SoleilSixsMed1p2`
:class:`~hkl.geometries.SoleilSixsMed2p2`
:class:`~hkl.geometries.SoleilSixsMed2p3`
:class:`~hkl.geometries.SoleilSixsMed2p3v2`
:class:`~hkl.geometries.Zaxis`
"""
calc_class = None
# see: Documentation has examples to use an EPICS PV for energy.
energy = Cpt(Signal, value=8.0, doc="Energy (in keV)")
energy_units = Cpt(Signal, value="keV")
energy_offset = Cpt(Signal, value=0)
energy_update_calc_flag = Cpt(Signal, value=True)
geometry_name = Cpt(
# fmt: off
AttributeSignal,
attr="calc.geometry_name",
doc="Diffractometer Geometry name",
write_access=False,
# fmt: on
)
class_name = Cpt(
# fmt: off
AttributeSignal,
attr="__class__.__name__",
doc="Diffractometer class name",
write_access=False,
# fmt: on
)
sample_name = Cpt(AttributeSignal, attr="calc.sample_name", doc="Sample name")
lattice = Cpt(
# fmt: off
ArrayAttributeSignal,
attr="calc.sample.lattice",
doc="Sample lattice",
# fmt: on
)
lattice_reciprocal = Cpt(
# fmt: off
AttributeSignal,
attr="calc._cfg_reciprocal",
doc="Reciprocal lattice",
# fmt: on
)
U = Cpt(AttributeSignal, attr="calc.sample.U", doc="U matrix")
UB = Cpt(AttributeSignal, attr="calc.sample.UB", doc="UB matrix")
# fmt: off
reflections = Cpt(
AttributeSignal,
attr="_reflections",
doc="Reflections",
)
reflections_details = Cpt(
AttributeSignal,
attr="calc.sample.reflections_details",
doc="Details of reflections",
)
ux = Cpt(
AttributeSignal,
attr="calc.sample.ux.value",
doc="ux portion of the U matrix",
)
uy = Cpt(
AttributeSignal,
attr="calc.sample.uy.value",
doc="uy portion of the U matrix",
)
uz = Cpt(
AttributeSignal,
attr="calc.sample.uz.value",
doc="uz portion of the U matrix",
)
diffractometer_name = Cpt(
AttributeSignal,
attr="name",
doc="Diffractometer name",
write_access=False,
)
_hklpy_version_ = __version__
_hklpy_version = Cpt(
AttributeSignal,
attr="_hklpy_version_",
doc="hklpy version",
write_access=False,
)
_pseudos = Cpt(
AttributeSignal,
attr="PseudoPosition._fields",
doc="Pseudo Positioners",
write_access=False,
)
_reals = Cpt(
AttributeSignal,
attr="RealPosition._fields",
doc="Real Positioners",
write_access=False,
)
_constraints = Cpt(
ArrayAttributeSignal,
attr="_constraints_for_databroker",
doc="Constraints",
write_access=False,
)
_mode = Cpt(
AttributeSignal,
attr="calc.engine.mode",
doc="Mode of Operation",
write_access=False,
)
orientation_attrs = Cpt(
AttributeSignal,
attr="_orientation_attrs",
doc="Orientation Attributes",
write_access=False,
)
_orientation_attrs = """
orientation_attrs
geometry_name class_name
UB U ux uy uz
energy energy_units energy_offset
sample_name lattice lattice_reciprocal reflections_details
_pseudos _reals
_constraints _mode
diffractometer_name _hklpy_version
""".split()
max_forward_iterations = Cpt(Signal, value=100, kind="config")
# fmt: on
# fmt: off
def __init__(
self,
prefix,
calc_kw=None,
decision_fcn=None,
calc_inst=None,
engine="hkl",
*,
configuration_attrs=None,
read_attrs=None,
**kwargs,
):
# fmt: on
if calc_inst is not None:
if not isinstance(calc_inst, self.calc_class):
# fmt: off
raise ValueError(
"Calculation instance must be derived"
f" from the class {self.calc_class}"
)
# fmt: on
self._calc = calc_inst
else:
if calc_kw is None:
calc_kw = {}
self._calc = self.calc_class(engine=engine, lock_engine=True, **calc_kw)
if not self.calc.engine_locked:
# Reason for this is that the engine determines the pseudomotor
# names, so if the engine is switched from underneath, the
# pseudomotor will no longer function properly
# fmt: off
raise ValueError(
"Calculation engine must be locked (CalcDiff.lock_engine set)"
)
# fmt: on
if configuration_attrs is None:
configuration_attrs = """
UB energy reflections_details geometry_name class_name
""".split()
if decision_fcn is None:
# the default decision function is to just grab solution #1:
decision_fcn = calc.default_decision_function
self._decision_fcn = decision_fcn
super().__init__(
# fmt: off
prefix,
read_attrs=read_attrs,
configuration_attrs=configuration_attrs, # write xtal info to descriptor
**kwargs,
# fmt: on
)
for attr in self.orientation_attrs.get():
getattr(self, attr).kind = "config"
self.energy_update_calc_flag.kind = "config"
self.orientation_attrs.kind = "config" # orientation written as descriptors
self._constraints_stack = []
self.energy.subscribe(self._energy_changed, event_type=Signal.SUB_VALUE)
self.energy_offset.subscribe(self._energy_offset_changed, event_type=Signal.SUB_VALUE)
self.energy_units.subscribe(self._energy_units_changed, event_type=Signal.SUB_VALUE)
@property
def _calc_energy_update_permitted(self):
"""return boolean `True` if permitted"""
acceptable_values = (1, "Yes", "locked", "OK", True, "On")
return self.energy_update_calc_flag.get() in acceptable_values
[docs]
def _energy_changed(self, value=None, **kwargs):
"""
Callback indicating that the energy signal was updated
.. note::
The `energy` signal is subscribed to this method
in the :meth:`Diffractometer.__init__()` method.
"""
if not self.connected:
logger.warning(
# fmt: off
"%s not fully connected, %s.calc.energy not updated",
self.name,
self.name,
# fmt: on
)
return
if self._calc_energy_update_permitted:
self._update_calc_energy()
[docs]
def _energy_offset_changed(self, value=None, **kwargs):
"""
Callback indicating that the energy offset signal was updated.
.. note::
The ``energy_offset`` signal is subscribed to this method
in the :meth:`Diffractometer.__init__()` method.
"""
if not self.connected:
logger.warning(
# fmt: off
"%s not fully connected, %s.calc.energy not updated",
self.name,
self.name,
# fmt: on
)
return
if self._calc_energy_update_permitted:
self._update_calc_energy()
[docs]
def _energy_units_changed(self, value=None, **kwargs):
"""
Callback indicating that the energy units signal was updated.
.. note::
The ``energy_units`` signal is subscribed to this method
in the :meth:`Diffractometer.__init__()` method.
"""
if not self.connected:
logger.warning(
# fmt: off
"%s not fully connected, %s.calc.energy not updated",
self.name,
self.name,
# fmt: on
)
return
if self._calc_energy_update_permitted:
self._update_calc_energy()
[docs]
def _update_calc_energy(self, value=None, **kwargs):
"""
writes ``self.calc.energy`` from ``value`` or ``self.energy``.
"""
if not self.connected:
logger.warning(
# fmt: off
"%s not fully connected, %s.calc.energy not updated",
self.name,
self.name,
# fmt: on
)
return
value = float(self.energy.get())
# energy_offset has same units as energy
value += self.energy_offset.get()
# comment these lines to skip unit conversion
units = self.energy_units.get()
if units != "keV":
keV = pint.Quantity(value, units).to("keV")
value = keV.magnitude
if value <= 0:
logger.debug("Computed energy(%s) is not positive", value)
return
logger.debug("setting %s.calc.energy = %s (keV)", self.name, value)
self.calc.energy = value
self._update_position()
@property
def calc(self):
"""The calculation instance"""
return self._calc
@property
def engine(self):
"""The calculation engine associated with the diffractometer"""
return self.calc.engine
# TODO so these calculations change the internal state of the hkl
# calculation class, which is probably not a good thing
# -- it becomes a problem when someone uses these functions
# outside of move()
@property
def _reflections(self):
"""Return the list of reflections as a [[float]]"""
return [list(r) for r in self.calc.sample.reflections]
[docs]
@pseudo_position_argument
def forward(self, pseudo):
"""
Calculate the real positions given the pseudo positions (hkl -> angles).
Return the default solution using the ``_decision_fcn()``.
"""
try:
solutions = self.calc.forward(list(pseudo))
except ValueError:
solutions = self.calc.forward_iter(
start=self.position, end=pseudo, max_iters=self.max_forward_iterations.get()
)
logger.debug("pseudo to real: %s", solutions)
return self._decision_fcn(pseudo, solutions)
[docs]
@real_position_argument
def inverse(self, real):
"""
Calculate the pseudo positions given the real positions (angles -> hkl).
"""
self.calc.physical_positions = real
return self.PseudoPosition(*self.calc.pseudo_positions)
[docs]
def check_value(self, pos):
"""
Raise exception if pos is not within limits.
In a scan, a subset of the pseudo axes may be directed,
which are given in a dict from a set message from the
bluesky RunEngine.
It is not permitted to scan both pseudo and real positioners.
"""
if isinstance(pos, dict):
# Redefine and fill in any missing values.
for axis, target in pos.items():
if hasattr(self, axis):
p = getattr(self, axis)
if p in self.real_positioners:
p.check_value(target)
else:
raise KeyError(f"{axis} not in {self.name}")
pos = [pos.get(p.attr_name, p.position) for p in self.pseudo_positioners]
super().check_value(pos)
[docs]
def apply_constraints(self, constraints):
"""
Constrain the solutions of the diffractometer's forward() computation.
This action will first save the current constraints onto
a stack, enabling both *undo* and *reset* features.
"""
self._push_current_constraints()
self._set_constraints(constraints)
[docs]
def reset_constraints(self):
"""Set constraints back to initial settings."""
if len(self._constraints_stack) > 0:
self._set_constraints(self._constraints_stack[0])
self._constraints_stack = []
@property
def _constraints_dict(self):
"""Return the constraints."""
return {
# fmt:off
m: Constraint(
*self.calc[m].limits,
self.calc[m].value,
self.calc[m].fit,
)
# fmt:on
for m in self.RealPosition._fields
}
@property
def _constraints_for_databroker(self):
"""
Return the constraints for databroker.
Cannot write a dictionary from bluesky because all values must
be of the same data type, chosen from the first item in the
list. Just write the values (the boolean will be written as a
float.) The constraints will be written in the order of the real
positioners.
"""
# fmt: off
return [
tuple(self._constraints_dict[p]._asdict().values())
for p in self.RealPosition._fields
]
# fmt: on
[docs]
def get_axis_constraints(self, axis):
"""Show the constraints for one axis."""
return self._constraints_dict[axis]
[docs]
def show_constraints(self, fmt="simple", printing=True):
"""Print the current constraints in a table."""
tbl = pyRestTable.Table()
tbl.labels = "axis low_limit high_limit value fit".split()
for k, c in self._constraints_dict.items():
tbl.addRow((k, *c))
if printing:
print(tbl.reST(fmt=fmt))
return tbl
[docs]
def undo_last_constraints(self):
"""
Remove the current constraints additions, restore previous.
"""
if len(self._constraints_stack) > 0:
self._set_constraints(self._constraints_stack.pop())
[docs]
def _push_current_constraints(self):
"""push current constraints onto the stack"""
constraints = {
m: Constraint(*self.calc[m].limits, self.calc[m].value, self.calc[m].fit)
for m in self.real_positioners._fields
}
self._constraints_stack.append(constraints)
[docs]
def _set_constraints(self, constraints):
"""set diffractometer's constraints"""
for axis, constraint in constraints.items():
self.calc[axis].limits = [
constraint.low_limit,
constraint.high_limit,
]
self.calc[axis].value = constraint.value
self.calc[axis].fit = constraint.fit
[docs]
def forward_solutions_table(self, reflections, full=False, digits=5):
"""
Return table of computed solutions for each supplied (hkl) reflection.
The solutions are calculated using the current UB matrix & constraints.
Parameters
----------
reflections : list of (h, k, l) reflections
Each reflection is a tuple of 3 numbers,
(h, k, l) of the reflection.
full : bool
If ``True``, show all solutions. If ``False``,
only show the default solution.
digits : int
Number of digits to roundoff each position
value. Default is 5.
"""
_table = pyRestTable.Table()
motors = self.real_positioners._fields
_table.labels = "(hkl) solution".split() + list(motors)
for reflection in reflections:
try:
solutions = self.calc.forward(reflection)
except ValueError as exc:
solutions = exc
if isinstance(solutions, ValueError):
row = [reflection, "none"]
row += ["" for m in motors]
_table.addRow(row)
else:
for i, s in enumerate(solutions):
row = [reflection, i]
row += [round(getattr(s, m), digits) for m in motors]
_table.addRow(row)
if not full:
break # only show the first (default) solution
return _table
[docs]
def pa(self, all_samples=False, printing=True):
"""
Report (all) the diffractometer settings.
EXAMPLE::
In [1]: from hkl import SimulatedK4CV
In [2]: k4cv = SimulatedK4CV('', name='k4cv')
In [3]: k4cv.pa() # FIXME lines are too long to include in source code
===================== ====================================================================
term value
===================== ====================================================================
diffractometer k4cv
geometry K4CV
class SimulatedK4CV
energy (keV) 8.05092
wavelength (angstrom) 1.54000
calc engine hkl
mode bissector
positions ====== =======
name value
====== =======
komega 0.00000
kappa 0.00000
kphi 0.00000
tth 0.00000
====== =======
sample: main ================ ===================================================
term value
================ ===================================================
unit cell edges a=1.54, b=1.54, c=1.54
unit cell angles alpha=90.0, beta=90.0, gamma=90.0
[U] [[1. 0. 0.]
[0. 1. 0.]
[0. 0. 1.]]
[UB] [[ 4.07999046e+00 -2.49827363e-16 -2.49827363e-16]
[ 0.00000000e+00 4.07999046e+00 -2.49827363e-16]
[ 0.00000000e+00 0.00000000e+00 4.07999046e+00]]
================ ===================================================
===================== ====================================================================
Out[3]: <pyRestTable.rest_table.Table at 0x7f5c16503e20>
"""
def addTable(tbl):
return str(tbl).strip()
def Package(**kwargs):
return ", ".join([f"{k}={v}" for k, v in kwargs.items()])
table = pyRestTable.Table()
table.labels = "term value".split()
table.addRow(("diffractometer", self.name))
table.addRow(("geometry", self.calc._geometry.name_get()))
table.addRow(("class", self.__class__.__name__))
table.addRow(("energy (keV)", f"{self.calc.energy:.5f}"))
table.addRow(("wavelength (angstrom)", f"{self.calc.wavelength:.5f}"))
table.addRow(("calc engine", self.calc.engine.name))
table.addRow(("mode", self.calc.engine.mode))
pt = pyRestTable.Table()
pt.labels = "name value".split()
if self.calc._axis_name_to_original:
pt.addLabel("original name")
for item in self.real_positioners:
row = [item.attr_name, f"{item.position:.5f}"]
k = self.calc._axis_name_to_original.get(item.attr_name)
if k is not None:
row.append(k)
pt.addRow(row)
table.addRow(("positions", addTable(pt)))
t = self.show_constraints(printing=False)
table.addRow(("constraints", addTable(t)))
if all_samples:
samples = self.calc._samples.values()
else:
samples = [self.calc._sample]
for sample in samples:
t = pyRestTable.Table()
t.labels = "term value".split()
nm = sample.name
if all_samples and sample == self.calc.sample:
nm += " (*)"
# fmt: off
t.addRow(
(
"unit cell edges",
Package(
**{
k: getattr(sample.lattice, k)
for k in "a b c".split()
}
),
)
)
t.addRow(
(
"unit cell angles",
Package(
**{
k: getattr(sample.lattice, k)
for k in "alpha beta gamma".split()
}
),
)
)
# fmt: on
for i, ref in enumerate(sample._sample.reflections_get()):
h, k, l = ref.hkl_get()
pos_arr = ref.geometry_get().axis_values_get(self.calc._units)
t.addRow((f"ref {i+1} (hkl)", Package(**dict(h=h, k=k, l=l))))
# fmt: off
t.addRow(
(
f"ref {i+1} positioners",
Package(
**{
k: f"{v:.5f}"
for k, v in zip(
self.calc.physical_axis_names, pos_arr
)
}
),
)
)
# fmt: on
t.addRow(("[U]", sample.U))
t.addRow(("[UB]", sample.UB))
table.addRow((f"sample: {nm}", addTable(t)))
if printing:
print(table)
return table
[docs]
def wh(self, printing=True):
"""
Report (brief) where is the diffractometer.
EXAMPLE::
In [1]: from hkl import SimulatedK4CV
In [2]: k4cv = SimulatedK4CV('', name='k4cv')
In [3]: k4cv.wh()
===================== ========= =========
term value axis_type
===================== ========= =========
diffractometer k4cv
sample name main
energy (keV) 8.05092
wavelength (angstrom) 1.54000
calc engine hkl
mode bissector
h 0.0 pseudo
k 0.0 pseudo
l 0.0 pseudo
komega 0 real
kappa 0 real
kphi 0 real
tth 0 real
===================== ========= =========
Out[3]: <pyRestTable.rest_table.Table at 0x7f55c4775cd0>
compare with similar function in SPEC::
1117.KAPPA> wh
H K L = 0 0 1.7345
Alpha = 20 Beta = 20 Azimuth = 90
Omega = 32.952 Lambda = 1.54
Two Theta Theta Chi Phi K_Theta Kappa K_Phi
40.000000 20.000000 90.000000 57.048500 77.044988 134.755995 114.093455
"""
table = pyRestTable.Table()
table.labels = "term value axis_type".split()
table.addRow(("diffractometer", self.name, ""))
table.addRow(("sample name", self.calc.sample.name, ""))
table.addRow(("energy (keV)", f"{self.calc.energy:.5f}", ""))
table.addRow(("wavelength (angstrom)", f"{self.calc.wavelength:.5f}", ""))
table.addRow(("calc engine", self.calc.engine.name, ""))
table.addRow(("mode", self.calc.engine.mode, ""))
pseudo_axes = [v.attr_name for v in self._pseudo]
real_axes = [v.attr_name for v in self._real]
for k in self._sig_attrs.keys():
v = getattr(self, k)
if not issubclass(v.__class__, PositionerBase):
continue
if k in real_axes:
label = "real"
elif k in pseudo_axes:
label = "pseudo"
else:
label = "additional"
table.addRow((k, v.position, label))
if printing:
print(table)
return table
[docs]
def geometry_table(self):
"""
Print a table describing this diffractometer geometry.
Calls :meth:`hkl.calc.CalcRecip.geometry_table()`.
"""
self.calc.geometry_table()