"""
Utility functions and structures.
.. autosummary::
~Constraint
~diffractometer_types
~get_package_info
~get_position_tuple
~Lattice
~list_orientation_runs
~new_detector
~new_lattice
~restore_constraints
~restore_energy
~restore_orientation
~restore_reflections
~restore_sample
~restore_UB
~run_orientation_info
~software_versions
Also provides `SI_LATTICE_PARAMETER` as defined by the
*2018 CODATA recommended lattice parameter of silicon*. [#]_
.. [#] https://physics.nist.gov/cgi-bin/cuu/Value?asil
"""
import logging
import subprocess
import sys
from collections import defaultdict
from collections import namedtuple
import databroker
import gi
import numpy as np
import pandas as pd
import tqdm
gi.require_version("Hkl", "5.0")
from gi.repository import GLib # noqa: F401
from gi.repository import Hkl as libhkl
__all__ = """
Constraint
diffractometer_types
get_package_info
get_position_tuple
Lattice
list_orientation_runs
new_detector
new_lattice
restore_constraints
restore_energy
restore_orientation
restore_reflections
restore_sample
restore_UB
run_orientation_info
SI_LATTICE_PARAMETER
software_versions
""".split()
logger = logging.getLogger(__name__)
# when getting software package versions
DEFAULT_PACKAGE_LIST = "hkl pygobject".split()
# 2018 CODATA recommended lattice parameter of silicon, Angstrom.
# see: https://physics.nist.gov/cgi-bin/cuu/Value?asil
SI_LATTICE_PARAMETER = 5.431020511
# also provide the reported uncertainty, in case anyone is interested
SI_LATTICE_PARAMETER_UNCERTAINTY = 0.000000089
[docs]
def new_detector(dtype=0):
"""Create a new HKL-library detector"""
return libhkl.Detector.factory_new(libhkl.DetectorType(dtype))
if libhkl:
diffractometer_types = tuple(sorted(libhkl.factories().keys()))
UserUnits = libhkl.UnitEnum.USER
DefaultUnits = libhkl.UnitEnum.DEFAULT
# fmt: off
units = {
"user": UserUnits,
"default": DefaultUnits
}
# fmt: on
else:
diffractometer_types = ()
units = {}
def to_numpy(mat):
"""Convert an hkl ``Matrix`` to a numpy ndarray
Parameters
----------
mat : Hkl.Matrix
Returns
-------
ndarray
"""
if isinstance(mat, np.ndarray):
return mat
ret = np.zeros((3, 3))
for i in range(3):
for j in range(3):
ret[i, j] = mat.get(i, j)
return ret
def to_hkl(arr):
"""Convert a numpy ndarray to an hkl ``Matrix``
Parameters
----------
arr : ndarray
Returns
-------
Hkl.Matrix
"""
if isinstance(arr, libhkl.Matrix):
return arr
arr = np.array(arr)
hklm = hkl_euler_matrix(0, 0, 0)
hklm.init(*arr.flatten())
return hklm
def hkl_euler_matrix(euler_x, euler_y, euler_z):
"""Convert into matrix form."""
return libhkl.Matrix.new_euler(euler_x, euler_y, euler_z)
def _gi_info(gi_val):
def get(attr):
try:
getter = getattr(gi_val, attr)
# inspect.signature doesn't work on gi functions...
return getter()
except Exception as ex:
try:
return getter(units["user"])
except Exception:
return f"({ex.__class__.__name__}: {ex})"
return {attr: get(attr) for attr in dir(gi_val) if attr.endswith("_get")}
[docs]
class Constraint:
"""
Limitations on acceptable positions from computed forward() solutions.
Parameters
----------
low_limit : float
Lowest acceptable value for this axis when computing real-space solutions
from given reciprocal-space positions.
high_limit : float
Highest acceptable value for this axis when computing real-space solutions
from given reciprocal-space positions.
value : float
Constant value used (on condition) for ``forward(hkl)`` calculation.
Implemented by diffractometer :attr:`~hkl.engine.Engine.mode`.
The diffractometer engine's :attr:`~hkl.engine.Engine.mode` (such as E4CV's
``constant_phi`` mode) controls whether or not the axis is to be held
constant.
fit : bool
(deprecated) Not used as a constraint.
The value of ``fit`` is ignored. It remains now for compatibility with
previous *hklpy* releases. It will be dropped in a future *hklpy* release.
While this parameter is used by *libhkl* to adjust lattice parameters when
refining the **UB** matrix from more than 2 reflections, it is not used in
the calculation of rotation angles from reciprocal-space coordinates.
"""
def __init__(self, low_limit, high_limit, value, fit=True):
self.low_limit = float(low_limit)
self.high_limit = float(high_limit)
self.value = float(value)
self.fit = bool(fit)
self._fields = "low_limit high_limit value fit".split()
# fmt: off
_fields = ", ".join(
name + "=" + repr(getattr(self, name))
for name in self._fields
)
# fmt: on
self._repr_fmt = f"({_fields})"
[docs]
def _asdict(self):
"Return a new dict which maps field names to their values."
return dict(zip(self._fields, self))
[docs]
class _ConstraintIterator:
"Iterator"
def __init__(self, constraint):
self.constraint = constraint
self._index = 0
def __next__(self):
if self._index < len(self.constraint._fields):
c = getattr(self.constraint, self.constraint._fields[self._index])
self._index += 1
return c
else:
raise StopIteration
def __iter__(self):
"Iterate through the fields."
return self._ConstraintIterator(self)
def __repr__(self):
"Return a nicely formatted representation string."
content = "(" + ", ".join([f"{k}={getattr(self, k)}" for k in self._fields]) + ")"
return self.__class__.__name__ + content
Lattice = namedtuple("LatticeTuple", "a b c alpha beta gamma")
[docs]
def new_lattice(a, b=None, c=None, alpha=90.0, beta=None, gamma=None):
"""
Simplify for high-symmetry crystal systems.
EXAMPLES (highest to lowest symmetry):
=============== =================================== === === === ======= ======= =====
system command a b c alpha beta gamma
=============== =================================== === === === ======= ======= =====
cubic new_lattice(5.) 5 5 5 90 90 90
hexagonal new_lattice(4., c=3., gamma=120) 4 4 3 90 90 120
rhombohedral new_lattice(4., alpha=80.0) 4 4 4 80 80 80
tetragonal new_lattice(4, c=3) 4 4 3 90 90 90
orthorhombic new_lattice(4, 5, 3) 4 5 3 90 90 90
monoclinic new_lattice(4, 5, 3, beta=75) 4 5 3 90 75 90
triclinic new_lattice(4, 5, 3, 75., 85., 95.) 4 5 3 75 85 95
=============== =================================== === === === ======= ======= =====
.. see: https://en.wikipedia.org/wiki/Crystal_system
"""
return Lattice(a, b or a, c or a, alpha, beta or alpha, gamma or alpha)
_position_tuples = {}
[docs]
def get_position_tuple(axis_names, class_name="Position"):
"""Return a namedtuple with the positions."""
global _position_tuples
key = frozenset(axis_names)
if key not in _position_tuples:
_position_tuples[key] = namedtuple(class_name, tuple(axis_names))
return _position_tuples[key]
[docs]
def list_orientation_runs(catalog, *args, limit=20):
"""
List the runs with orientation information.
Returns
-------
orientation information: Pandas DataFrame object
Parameters
----------
*args : [str]
A list of additional data column names to be displayed,
corresponding to the names of available orientation
information in the descriptor document.
Example::
list_orientation_runs(
"class_name", energy", "energy_units", "lattice")
catalog : object
Instance of a databroker catalog.
limit : int
Limit the list to the first ``limit`` rows. (default=20)
"""
buffer = []
_count = 0
default_columns = "sample_name diffractometer_name geometry_name".split()
display_columns = default_columns + list(args)
with tqdm.tqdm(total=len(catalog.v2), file=sys.stdout) as progress_bar:
for run in catalog.v2.values():
info = run_orientation_info(run)
if len(info):
scan_id = run.metadata["start"]["scan_id"]
if databroker.__version__ >= "2.0":
uid = run.start["uid"][:7]
else:
uid = run.name[:7]
for device in sorted(info.keys()):
orientation = info[device]
row = dict(scan_id=scan_id)
for f in display_columns:
if f in orientation:
row[f] = orientation[f]
row["uid"] = uid
buffer.append(row)
_count += 1
if _count >= limit:
break
if _count >= limit:
break
progress_bar.update()
return pd.DataFrame(buffer)
[docs]
def run_orientation_info(run):
"""
Return a dictionary with orientation information in this run.
Dictionary is keyed by "detector" name (in case more than one
diffractometer was added as a "detector" to the run).
The orientation information is found in the descriptor document
of the primary stream.
Parameters
----------
run : from Databroker
A Bluesky run, from databroker v2, such as ``cat.v2[-1]``.
"""
devices = {}
try:
if databroker.__version__ >= "2.0":
for descriptor in run.primary.descriptors:
for device, configuration in descriptor.get("configuration", {}).items():
conf = configuration.get("data", {})
if f"{device}_orientation_attrs" in conf:
# fmt:off
devices[device] = {
item[len(f"{device}_"):]: value
for item, value in conf.items()
}
# fmt:on
else: # older databroker v1.2
run_conf = run.primary.config
for device in sorted(run_conf):
conf = run_conf[device].read()
if f"{device}_orientation_attrs" in conf:
# fmt:off
devices[device] = {
item[len(f"{device}_"):]: conf[item].to_dict()["data"][0]
for item in conf
}
# fmt:on
except Exception as exc:
logger.warning("Could not process run %s, due to %s", run, exc)
return devices
def _smart_signal_update(value, signal):
"""Write value to signal if not equal. Not a plan."""
if signal.get() != value:
signal.put(value)
def _check_geometry(orientation, diffractometer):
"""
Check that geometry of recovered orientation matches diffractometer.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
Raise ValueError if mismatched.
"""
if orientation["geometry_name"] != diffractometer.geometry_name.get():
raise ValueError(
"Geometries do not match:"
f" Orientation={orientation['geometry_name']},"
f" {diffractometer.name}={diffractometer.geometry_name.get()},"
" will not restore."
)
[docs]
def restore_constraints(orientation, diffractometer):
"""
Restore any constraints from orientation information.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
_check_geometry(orientation, diffractometer)
# fmt:off
if len(orientation["_constraints"]):
con_dict = {
k: Constraint(*v)
for k, v in zip(orientation["_reals"], orientation["_constraints"])
}
diffractometer.apply_constraints(con_dict)
# fmt:on
[docs]
def restore_energy(orientation, diffractometer):
"""
Restore energy from orientation information.
NOTE: This makes blocking calls so do not use in bluesky plan.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
# get _all_ the expected keys
try:
kv_dict = {
orientation[attr]: getattr(diffractometer, attr)
for attr in "energy energy_units energy_offset".split()
}
except KeyError as exc:
# fmt: off
raise KeyError(
f"{diffractometer.name}: Cannot restore "
f"diffractometer energy due to missing {exc} term."
)
# fmt: on
# update the signals
for k, v in kv_dict.items():
_smart_signal_update(k, v)
[docs]
def restore_reflections(orientation, diffractometer):
"""
Restore any reflections from orientation information.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
_check_geometry(orientation, diffractometer)
calc = diffractometer.calc
# remember this wavelength
wavelength0 = calc.wavelength
# short aliases
pseudos = orientation["_pseudos"]
reals = orientation["_reals"]
if reals != calc.physical_axis_names and reals == calc._geometry.axis_names_get():
# Substitute user-defined axes names for canonical axes names.
reals = calc.physical_axis_names
orientation_reflections = []
# might be renamed axes
renaming = calc._axis_name_to_original
for ref_base in orientation["reflections_details"]:
# every reflection has its own wavelength
calc.wavelength = ref_base["wavelength"]
# Order of the items is important.
# Can't just use the dictionaries in ``orientation``.
# Get the canonical order from the orientation data.
miller_indices = [ref_base["reflection"][key] for key in pseudos]
try:
positions = [ref_base["position"][key] for key in reals]
except KeyError:
# switch to use renamed keys
positions = [ref_base["position"][renaming[key]] for key in reals]
ppp = namedtuple("PositionTuple", tuple(reals))(*positions)
# assemble the final form of the reflection for add_reflection()
reflection = tuple([*miller_indices, ppp])
r = calc.sample.add_reflection(*reflection)
if ref_base["orientation_reflection"]:
orientation_reflections.append(r)
if len(orientation_reflections) > 1:
# compute **UB** from the last two orientation reflections
calc.sample.compute_UB(*orientation_reflections[-2:])
# restore previous wavelength
if calc.wavelength != wavelength0:
calc.wavelength = wavelength0
[docs]
def restore_orientation(orientation, diffractometer):
"""
Restore all orientation information.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
_check_geometry(orientation, diffractometer)
restore_energy(orientation, diffractometer)
restore_sample(orientation, diffractometer)
restore_constraints(orientation, diffractometer)
restore_reflections(orientation, diffractometer)
[docs]
def restore_sample(orientation, diffractometer):
"""
Restore sample & lattice from orientation information.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
nm = orientation["sample_name"]
lattice = orientation["lattice"]
if nm not in diffractometer.calc._samples:
diffractometer.calc.new_sample(nm, lattice=lattice)
else:
raise ValueError(f"Sample '{nm}' already exists in {diffractometer.name}.")
[docs]
def restore_UB(orientation, diffractometer):
"""
Restore **UB** matrix from orientation information.
Parameters
----------
orientation : dict
Dictionary of orientation parameters (from
:func:`~hkl.util.run_orientation_info()`) recovered from run.
diffractometer : :class:`~hkl.diffract.Diffractometer()`
Diffractometer object.
"""
_check_geometry(orientation, diffractometer)
_smart_signal_update(orientation["UB"], diffractometer.UB)
def _installed_package_information():
"""Index information about packages known by conda and/or pip."""
packages = defaultdict(dict)
for tool in "conda pip".split():
s = subprocess.run([tool, "list"], capture_output=True)
for line in s.stdout.splitlines():
if not line.decode().startswith("#"):
args = line.decode().strip().split()
key = args[0]
packages[key]["version"] = args[1]
packages[key][tool] = True
if tool == "conda":
packages[key]["build"] = args[2]
packages[key]["conda"] = True
if len(args) == 4:
packages[key]["channel"] = args[3]
elif tool == "pip":
if len(args) == 3:
packages[key]["location"] = args[2]
return packages
# cache of discovered installed package information
_package_info = None
[docs]
def get_package_info(package_name):
"""Return dict of information about installed package, by name."""
global _package_info
if _package_info is None:
# index the known packages
# This is not expected to change once the process has started.
_package_info = _installed_package_information()
return _package_info.get(package_name)
[docs]
def software_versions(keys=[]):
"""
Report the package versions, in a dictionary.
EXAMPLE::
In [1]: import gi
...:
...: gi.require_version("Hkl", "5.0")
...:
...: import hkl.util
In [2]: hkl.util.software_versions()
Out[2]:
{'hkl': '5.0.0.2173',
'hklpy': '0.3.16+131.ga5a449a.dirty',
'pygobject': '3.40.1'}
Here, it shows (albeit indirectly) *Hkl 5.0.0 tag 2173*. Proceed to the Hkl
source repository, list of tags [#]_, and find tag ``2173`` [#]_.
.. [#] Hkl source tags: https://repo.or.cz/hkl.git/refs
.. [#] Hkl tag 2173: https://repo.or.cz/hkl.git/tree/refs/tags/v5.0.0.2173)
"""
if keys is None or len(keys) == 0:
keys = DEFAULT_PACKAGE_LIST
v_dict = {}
for key in keys:
info = get_package_info(key)
if info is not None:
v_dict[key] = info.get("version")
return v_dict