Source code for hkl.calc

"""
Calculation support for diffractometers

.. autosummary::

    ~default_decision_function
    ~UnreachableError
    ~CalcRecip

**Specific Geometries**

.. autosummary::

    ~CalcApsPolar
    ~CalcE4CH
    ~CalcE4CV
    ~CalcE6C
    ~CalcK4CV
    ~CalcK6C
    ~CalcPetra3_p09_eh2
    ~CalcPetra3_p23_4c
    ~CalcPetra3_p23_6c
    ~CalcSoleilMars
    ~CalcSoleilNanoscopiumRobot
    ~CalcSoleilSiriusKappa
    ~CalcSoleilSiriusTurret
    ~CalcSoleilSixsMed1p2
    ~CalcSoleilSixsMed2p2
    ~CalcSoleilSixsMed2p3
    ~CalcSoleilSixsMed2p3v2
    ~CalcZaxis

"""

import functools
import logging
from collections import OrderedDict
from threading import RLock

import numpy as np

from . import util
from .context import UsingEngine
from .engine import CalcParameter
from .engine import Engine
from .sample import HklSample
from .util import libhkl

__all__ = """
    A_KEV
    CalcApsPolar
    CalcE4CH
    CalcE4CV
    CalcE6C
    CalcK4CV
    CalcK6C
    CalcPetra3_p09_eh2
    CalcPetra3_p23_4c
    CalcPetra3_p23_6c
    CalcRecip
    CalcSoleilMars
    CalcSoleilNanoscopiumRobot
    CalcSoleilSiriusKappa
    CalcSoleilSiriusTurret
    CalcSoleilSixsMed1p2
    CalcSoleilSixsMed2p2
    CalcSoleilSixsMed2p3
    CalcZaxis
    default_decision_function
    NM_KEV
    UnreachableError
""".split()
logger = logging.getLogger(__name__)

# per NIST publication of CODATA Fundamental Physical Constants
# https://www.nist.gov/programs-projects/codata-values-fundamental-physical-constants
A_KEV = 12.39841984  # 1 Angstrom ~= 12.39842 keV
NM_KEV = A_KEV * 0.1  # 1 nm ~= 1.239842 keV
# Note: NM_KEV is not used by hklpy now, remains here as legacy


[docs] def default_decision_function(position, solutions): """The default decision function - returns the first solution.""" return solutions[0]
# This is used below by CalcRecip. def _locked(func): """a decorator for running a method with the instance's lock""" @functools.wraps(func) def wrapped(self, *args, **kwargs): with self._lock: return func(self, *args, **kwargs) return wrapped # This is used below by CalcRecip. def _keep_physical_position(func): """ decorator: stores/restores the physical motor position during calculations """ @functools.wraps(func) def wrapped(self, *args, **kwargs): with self._lock: initial_pos = self.physical_positions try: return func(self, *args, **kwargs) finally: self.physical_positions = initial_pos return wrapped
[docs] class UnreachableError(ValueError): """Position is unreachable. Attributes ---------- pseudo : sequence Last reachable pseudo position in the trajectory physical : sequence Corresponding physical motor positions """ def __init__(self, msg, pseudo, physical): super().__init__(msg) self.pseudo = pseudo self.physical = physical
[docs] class CalcRecip(object): """Reciprocal space calculations .. autosummary:: ~add_sample ~calc_linear_path ~energy ~engine ~engine_locked ~engines ~forward ~forward_iter ~geometry_name ~geometry_table ~get_path ~inverse ~inverted_axes ~new_sample ~parameters ~physical_axes ~physical_axis_names ~physical_positions ~Position ~pseudo_axes ~pseudo_axis_names ~pseudo_positions ~sample ~sample_name ~units ~update ~wavelength Parameters ---------- dtype : str Diffractometer type (usually specified by a subclass) engine : str, optional 'hkl', for example sample : str, optional Default sample name (default: 'main') lattice : Lattice, optional Lattice to use with the default sample degrees : bool, optional Use degrees instead of radians (default: True) units : {'user', } The type of units to use internally lock_engine : bool, optional Don't allow the engine to be changed during the life of this object inverted_axes : list, optional Names of axes to invert the sign """ def __init__( self, dtype, engine="hkl", sample="main", lattice=None, degrees=True, units="user", lock_engine=False, inverted_axes=None, ): self._engine = None # set below with property self._detector = util.new_detector() self._degrees = bool(degrees) self._sample = None self._samples = {} self._unit_name = units self._units = util.units[self._unit_name] self._lock_engine = bool(lock_engine) self._lock = RLock() self._axis_name_to_renamed = {} self._axis_name_to_original = {} self._inverted_axes = inverted_axes or [] try: self._factory = libhkl.factories()[dtype] except KeyError: types = ", ".join(util.diffractometer_types) raise ValueError(f"Invalid diffractometer type {types!r}; choose from: {types}") self._geometry = self._factory.create_new_geometry() self._engine_list = self._factory.create_new_engine_list() self._engine_names = [e.name_get() for e in self._engine_list.engines_get()] if sample is not None: if isinstance(sample, HklSample): if lattice is not None: sample.lattice = lattice self.add_sample(sample) else: self.new_sample(sample, lattice=lattice) self.engine = engine @property def Position(self): """Dynamically-generated physical motor position class""" # I know, I know, could be done more cleanly... name = f"Pos{self.__class__.__name__}" return util.get_position_tuple(self.physical_axis_names, class_name=name) @property def wavelength(self): """The wavelength associated with the geometry, in angstrom""" return self._geometry.wavelength_get(self._units) @wavelength.setter def wavelength(self, wavelength): self._geometry.wavelength_set(wavelength, self._units) @property def energy(self): """The energy associated with the geometry, in keV""" return A_KEV / self.wavelength @energy.setter def energy(self, energy): self.wavelength = A_KEV / energy @property def engine_locked(self): """If set, do not allow the engine to be changed post-initialization""" return self._lock_engine @property def engine(self): """ """ return self._engine @engine.setter @_locked def engine(self, engine): if engine is self._engine: return if self._lock_engine and self._engine is not None: raise ValueError("Engine is locked on this %s instance" % self.__class__.__name__) if isinstance(engine, libhkl.Engine): self._engine = engine else: engines = self.engines try: self._engine = engines[engine] except KeyError: raise ValueError("Unknown engine name or type") self._re_init()
[docs] def _canonical2user(self, canonical): """Convert canonical axis names to user (renames).""" if len(self._axis_name_to_renamed) > 0: axis = self._axis_name_to_renamed[canonical] else: axis = canonical return axis
@property def axes_r(self): """User-defined real-space axes used for forward() calculation.""" return [self._canonical2user(ax) for ax in self._engine.axes_r] @property def axes_w(self): """User-defined real-space axes written by forward() calculation.""" return [self._canonical2user(ax) for ax in self._engine.axes_w] @property def axes_c(self): """User-defined real-space axes held constant during forward() calculation.""" return [self._canonical2user(ax) for ax in self._engine.axes_c] @property def geometry_name(self): """Name of this geometry, as defined in **libhkl**.""" return self._geometry.name_get()
[docs] def geometry_table(self, rst=False): """ Describe this geometry in a table. Parameters ---------- rst : *bool* When True, format using restructured text. Otherwise, use a simpler representation """ import sys import pyRestTable def format_name_list(names): if rst: names = [f"``{k}``" for k in names] return ", ".join(names) cname = self.__class__.__name__ gname = self.geometry_name engines = list(self.engines.keys()) real_axes = format_name_list(self.physical_axis_names) table = pyRestTable.Table() table.addLabel("engine") table.addLabel("pseudo axes") table.addLabel("mode") table.addLabel("axes read") table.addLabel("axes written") table.addLabel("extra parameters") for engine in sorted(engines): # Engine setting is locked when class is created. Default is "hkl". # make new goniometer with chosen engine. The "trick" here # is to get the class from __this__ module. # See: https://stackoverflow.com/a/7668273/1046449 gonio = getattr(sys.modules[__name__], cname)(engine=engine) for mode in sorted(gonio.engine.modes): gonio.engine.mode = mode # axes_r = format_name_list(gonio.engine._engine.axis_names_get(0)) axes_w = format_name_list(gonio.engine._engine.axis_names_get(1)) parameters = format_name_list(gonio.parameters) pseudo_axes = format_name_list(gonio.pseudo_axis_names) row = [engine, pseudo_axes, mode, axes_r, axes_w, parameters] table.addRow(row) if rst: gname_safe = gname.replace(" ", "_") print(f".. index:: {gname_safe}, geometry; {gname_safe}") print() print(f".. _{gname_safe}_table:") print() title = f"Geometry: ``{self.geometry_name}``" print(title) print("+" * len(title)) print() print(f"* real axes: {real_axes}") print("* pseudo axes: depends on the engine") else: print(f"Geometry: {self.geometry_name}") print(f" real axes: {real_axes}") print(" pseudo axes: depends on the engine") print() print(table)
[docs] def _get_sample(self, name): if isinstance(name, libhkl.Sample): return name return self._samples[name]
@property def sample_name(self): """The name of the currently selected sample.""" return self._sample.name @sample_name.setter @_locked def sample_name(self, new_name): sample = self._sample sample.name = new_name @property def sample(self): """Currently selected sample.""" return self._sample @sample.setter @_locked def sample(self, sample): if sample is self._sample: return elif sample == self._sample.name: return if isinstance(sample, HklSample): if sample not in self._samples.values(): self.add_sample(sample, select=False) elif sample in self._samples: name = sample sample = self._samples[name] else: raise ValueError("Unknown sample type (expected HklSample)") self._sample = sample self._re_init()
[docs] def add_sample(self, sample, select=True): """Add an HklSample Parameters ---------- sample : HklSample instance The sample name, or optionally an already-created HklSample instance select : bool, optional Select the sample to focus calculations on """ if not isinstance(sample, (HklSample, libhkl.Sample)): raise ValueError("Expected either an HklSample or a Sample instance") if isinstance(sample, libhkl.Sample): sample = HklSample(calc=self, sample=sample, units=self._unit_name) if sample.name in self._samples: raise ValueError('Sample of name "%s" already exists' % sample.name) self._samples[sample.name] = sample if select: self._sample = sample self._re_init() return sample
[docs] def new_sample(self, name, select=True, **kwargs): """Convenience function to add a sample by name Keyword arguments are passed to the new HklSample initializer. Parameters ---------- name : str The sample name select : bool, optional Select the sample to focus calculations on """ units = kwargs.pop("units", self._unit_name) sample = HklSample(self, sample=libhkl.Sample.new(name), units=units, **kwargs) return self.add_sample(sample, select=select)
[docs] @_locked def _re_init(self): if self._engine is None: return if self._geometry is None or self._detector is None or self._sample is None: raise ValueError("Not all parameters set (geometry, detector, sample)") # pass else: self._engine_list.init(self._geometry, self._detector, self._sample.hkl_sample)
@property def engines(self): return dict( (engine.name_get(), Engine(self, engine, self._engine_list)) for engine in self._engine_list.engines_get() ) @property def parameters(self): return self._engine.parameters @property def physical_axis_names(self): if self._axis_name_to_renamed: return list(self._axis_name_to_renamed.values()) else: return self._geometry.axis_names_get() @physical_axis_names.setter def physical_axis_names(self, axis_name_map): """Set a persistent re-map of physical axis names Resets `inverted_axes`. Parameters ---------- axis_name_map : dict {orig_axis_1: new_name_1, ...} """ internal_axis_names = self._geometry.axis_names_get() if set(axis_name_map.keys()) != set(internal_axis_names): raise ValueError("Every axis name has to have a remapped name") self._axis_name_to_original = OrderedDict((axis_name_map[axis], axis) for axis in internal_axis_names) self._axis_name_to_renamed = OrderedDict((axis, axis_name_map[axis]) for axis in internal_axis_names) self._inverted_axes = [] @property def inverted_axes(self): """The physical axis names to invert""" return self._inverted_axes @inverted_axes.setter def inverted_axes(self, to_invert): for axis in to_invert: assert axis in self.physical_axis_names self._inverted_axes = to_invert
[docs] def _invert_physical_positions(self, pos): """Invert the physical axis positions based on the settings Parameters ---------- pos : OrderedDict NOTE: Modified in-place """ for axis in self._inverted_axes: pos[axis] = -pos[axis] return pos
@property def physical_positions(self): """Physical (real) motor positions""" pos = self.physical_axes if self._inverted_axes: pos = self._invert_physical_positions(pos) return self.Position(*pos.values()) @physical_positions.setter @_locked def physical_positions(self, positions): if self._inverted_axes: pos = self.Position(*positions)._asdict() pos = self._invert_physical_positions(pos) positions = list(pos.values()) # Set the physical motor positions and calculate the pseudo ones self._geometry.axis_values_set(positions, self._units) self.update() @property def physical_axes(self): """Physical (real) motor positions as an OrderedDict""" return OrderedDict( # fmt: off zip( self.physical_axis_names, self._geometry.axis_values_get(self._units), ) # fmt: on ) @property def pseudo_axis_names(self): """Pseudo axis names from the current engine""" return self._engine.pseudo_axis_names @property def pseudo_positions(self): """Pseudo axis positions/values from the current engine""" return self._engine.pseudo_positions @property def pseudo_axes(self): """Ordered dictionary of axis name to position""" return self._engine.pseudo_axes
[docs] def update(self): """Calculate the pseudo axis positions from the real axis positions""" return self._engine.update()
[docs] def _get_axis_by_name(self, name): """Given an axis name, return the HklParameter Parameters ---------- name : str If a name map is specified, this is the mapped name. """ name = self._axis_name_to_original.get(name, name) return self._geometry.axis_get(name)
@property def units(self): """The units used for calculations""" return self._unit_name def __getitem__(self, axis): return CalcParameter( self._get_axis_by_name(axis), units=self._unit_name, name=axis, inverted=axis in self._inverted_axes, geometry=self._geometry, ) def __setitem__(self, axis, value): param = self[axis] param.value = value
[docs] @_keep_physical_position def forward_iter(self, start, end, max_iters, *, threshold=0.99, decision_fcn=None): """Iteratively attempt to go from a pseudo start -> end position For every solution failure, the position is moved back. For every success, the position is moved closer to the destination. After up to max_iters, the position can be reached, the solutions will be returned. Otherwise, ValueError will be raised stating the last position that was reachable and the corresponding motor positions. Parameters ---------- start : Position end : Position max_iters : int Maximum number of iterations threshold : float, optional Normalized proximity to `end` position to stop iterating decision_fcn : callable, optional Function to choose a solution from several. Defaults to picking the first solution. Here is the default ``decision_fcn()``:: def decision(pseudo_position, solution_list): return solution_list[0] Returns ------- solutions : list Raises ------ UnreachableError (ValueError) Position cannot be reached The last valid HKL position and motor positions are accessible in this exception instance. """ start = np.array(start) end = np.array(end) if decision_fcn is None: decision_fcn = default_decision_function min_t = 0.0 t = 1.0 iters = 0 valid_pseudo = None valid_real = None while iters < max_iters: try: pos = (1.0 - t) * start + t * end self.engine.pseudo_positions = pos except ValueError: # couldn't calculate - step back half-way t = (min_t + t) / 2.0 else: if t > min_t: min_t = t # successful calculation - move forward t = (t + 1) / 2.0 valid_pseudo = self.engine.pseudo_positions valid_real = decision_fcn(valid_pseudo, self.engine.solutions) self.physical_positions = valid_real if t >= threshold: break iters += 1 try: self.engine.pseudo_positions = end return self.engine.solutions except ValueError: raise UnreachableError( f"Unable to solve. iterations={iters}/{max_iters}\n" f"Last valid position: {valid_pseudo}\n{valid_real} ", pseudo=valid_pseudo, physical=valid_real, )
[docs] @_keep_physical_position def forward(self, position, engine=None): """Calculate real positions from pseudo positions.""" with UsingEngine(self, engine): if self.engine is None: raise ValueError("Engine unset") self.engine.pseudo_positions = position return self.engine.solutions
[docs] @_keep_physical_position def inverse(self, real): """Calculate pseudo positions from real positions.""" self.physical_positions = real # self.update() # done implicitly in setter return self.pseudo_positions
[docs] def calc_linear_path(self, start, end, n, num_params=0, **kwargs): """ Construct a trajectory from start to end with n steps (n+1 points). Example (for ``hkl`` engine):: >>> e4cv.calc.calc_linear_path((1,1,0), (1,-1,0), 4, num_params=3) [(1.0, 1.0, 0.0), (1.0, 0.5, 0.0), (1.0, 0.0, 0.0), (1.0, -0.5, 0.0), (1.0, -1.0, 0.0)] """ # from start to end, in a linear path singles = [np.linspace(start[i], end[i], n + 1) for i in range(num_params)] return list(zip(*singles))
[docs] def _get_path_fcn(self, path_type): try: return getattr(self, "calc_%s_path" % (path_type)) except AttributeError: raise ValueError("Invalid path type specified (%s)" % path_type)
[docs] def get_path(self, start, end=None, n=100, path_type="linear", **kwargs): """ """ num_params = len(self.pseudo_axis_names) start = np.array(start) path_fcn = self._get_path_fcn(path_type) if end is not None: end = np.array(end) if start.size == end.size == num_params: return path_fcn(start, end, n, num_params=num_params, **kwargs) else: positions = np.array(start) if positions.ndim == 1 and positions.size == num_params: # single position return [list(positions)] elif positions.ndim == 2: if positions.shape[0] == 1 and positions.size == num_params: # [[h, k, l], ] return [positions[0]] elif positions.shape[0] == num_params: # [[h, k, l], [h, k, l], ...] return [positions[i, :] for i in range(num_params)] raise ValueError("Invalid set of %s positions" % ", ".join(self.pseudo_axis_names))
def __call__( # fmt: off self, start, end=None, n=100, engine=None, path_type="linear", **kwargs, # fmt: on ): with UsingEngine(self, engine): for pos in self.get_path(start, end=end, n=n, path_type=path_type, **kwargs): yield self.forward(pos, engine=None, **kwargs)
[docs] def _repr_info(self): r = [ f"engine={self.engine.name!r}", f"detector={self._detector!r}", f"sample={self._sample!r}", f"samples={self._samples!r}", ] return r
def __repr__(self): return f"{self.__class__.__name__} ({', '.join(self._repr_info())})" def __str__(self): return repr(self) @property def _cfg_reciprocal(self): """Return reciprocal lattice to save as configuration.""" return tuple(list(self.sample.reciprocal))
[docs] class CalcApsPolar(CalcRecip): """Geometry: APS POLAR""" def __init__(self, **kwargs): super().__init__("APS POLAR", **kwargs)
[docs] class CalcE4CH(CalcRecip): """Geometry: E4CH""" def __init__(self, **kwargs): super().__init__("E4CH", **kwargs)
[docs] class CalcE4CV(CalcRecip): """Geometry: E4CV""" def __init__(self, **kwargs): super().__init__("E4CV", **kwargs)
[docs] class CalcE6C(CalcRecip): """Geometry: E6C""" def __init__(self, **kwargs): super().__init__("E6C", **kwargs)
[docs] class CalcK4CV(CalcRecip): """Geometry: K4CV""" def __init__(self, **kwargs): super().__init__("K4CV", **kwargs)
[docs] class CalcK6C(CalcRecip): """Geometry: K6C""" def __init__(self, **kwargs): super().__init__("K6C", **kwargs)
[docs] class CalcPetra3_p09_eh2(CalcRecip): """Geometry: PETRA3 P09 EH2""" def __init__(self, **kwargs): super().__init__("PETRA3 P09 EH2", **kwargs)
[docs] class CalcPetra3_p23_4c(CalcRecip): """Geometry: PETRA3 P23 4C""" def __init__(self, **kwargs): super().__init__("PETRA3 P23 4C", **kwargs)
[docs] class CalcPetra3_p23_6c(CalcRecip): """Geometry: PETRA3 P23 6C""" def __init__(self, **kwargs): super().__init__("PETRA3 P23 6C", **kwargs)
[docs] class CalcSoleilMars(CalcRecip): """Geometry: SOLEIL MARS""" def __init__(self, **kwargs): super().__init__("SOLEIL MARS", **kwargs)
[docs] class CalcSoleilNanoscopiumRobot(CalcRecip): """Geometry: SOLEIL NANOSCOPIUM ROBOT""" def __init__(self, **kwargs): super().__init__("SOLEIL NANOSCOPIUM ROBOT", **kwargs)
[docs] class CalcSoleilSiriusKappa(CalcRecip): """Geometry: SOLEIL SIRIUS KAPPA""" def __init__(self, **kwargs): super().__init__("SOLEIL SIRIUS KAPPA", **kwargs)
[docs] class CalcSoleilSiriusTurret(CalcRecip): """Geometry: SOLEIL SIRIUS TURRET""" def __init__(self, **kwargs): super().__init__("SOLEIL SIRIUS TURRET", **kwargs)
[docs] class CalcSoleilSixsMed1p2(CalcRecip): """Geometry: SOLEIL SIXS MED1+2""" def __init__(self, **kwargs): super().__init__("SOLEIL SIXS MED1+2", **kwargs)
[docs] class CalcSoleilSixsMed2p2(CalcRecip): """Geometry: SOLEIL SIXS MED2+2""" def __init__(self, **kwargs): super().__init__("SOLEIL SIXS MED2+2", **kwargs)
[docs] class CalcSoleilSixsMed2p3(CalcRecip): """Geometry: SOLEIL SIXS MED2+3""" def __init__(self, **kwargs): super().__init__("SOLEIL SIXS MED2+3", **kwargs)
class CalcSoleilSixsMed2p3v2(CalcRecip): """Geometry: SOLEIL SIXS MED2+3 v2""" def __init__(self, **kwargs): super().__init__("SOLEIL SIXS MED2+3 v2", **kwargs)
[docs] class CalcZaxis(CalcRecip): """Geometry: ZAXIS""" def __init__(self, **kwargs): super().__init__("ZAXIS", **kwargs)