Source code for ophyd_async.sim._pattern_generator

from __future__ import annotations

import asyncio
from collections.abc import AsyncGenerator
from pathlib import Path

import h5py
import numpy as np

# raw data path
DATA_PATH = "/entry/data/data"

# pixel sum path
SUM_PATH = "/entry/sum"


def generate_gaussian_blob(height: int, width: int) -> np.ndarray:
    """Make a Gaussian Blob with float values in range 0..1."""
    x, y = np.meshgrid(np.linspace(-1, 1, width), np.linspace(-1, 1, height))
    d = np.sqrt(x * x + y * y)
    blob = np.exp(-(d**2))
    return blob


def generate_interesting_pattern(
    x: float, y: float, channel: int, offset: float
) -> float:
    """Return a float value in range 0..1.

    Interesting in x and y in range -10..10
    """
    return (np.sin(x) ** channel + np.cos(x * y + offset) + 2) / 4


class PatternFile:
    def __init__(
        self,
        path: Path,
        width: int = 320,
        height: int = 240,
    ):
        self.file = h5py.File(path, "w", libver="latest")
        self.data = self.file.create_dataset(
            name=DATA_PATH,
            shape=(0, height, width),
            dtype=np.uint8,
            maxshape=(None, height, width),
        )
        self.sum = self.file.create_dataset(
            name=SUM_PATH,
            shape=(0,),
            dtype=np.int64,
            maxshape=(None,),
        )
        # Once datasets written, can switch the model to single writer multiple reader
        self.file.swmr_mode = True
        self.blob = generate_gaussian_blob(height, width) * np.iinfo(np.uint8).max
        self.image_counter = 0
        self.q = asyncio.Queue()

    def write_image_to_file(self, intensity: float):
        data = np.floor(self.blob * intensity)
        for dset, value in ((self.data, data), (self.sum, np.sum(data))):
            dset.resize(self.image_counter + 1, axis=0)
            dset[self.image_counter] = value
            dset.flush()
        self.q.put_nowait(self.image_counter)
        self.image_counter += 1

    def close(self):
        self.file.close()


[docs] class PatternGenerator: """Generates pattern images in files.""" def __init__(self): self._x = 0.0 self._y = 0.0 self._file: PatternFile | None = None
[docs] def set_x(self, x: float): self._x = x
[docs] def set_y(self, y: float): self._y = y
[docs] def generate_point(self, channel: int = 1, high_energy: bool = False) -> float: """Make a point between 0 and 1 based on x and y.""" offset = 100 if high_energy else 10 return generate_interesting_pattern(self._x, self._y, channel, offset)
[docs] def open_file(self, path: Path, width: int, height: int): self._file = PatternFile(path, width, height)
def _get_file(self) -> PatternFile: if not self._file: raise RuntimeError("open_file not run") return self._file
[docs] def write_image_to_file(self, exposure: float): self._get_file().write_image_to_file(self.generate_point() * exposure)
[docs] async def observe_indices_written(self, timeout: float) -> AsyncGenerator[int]: file = self._get_file() if file.image_counter: yield file.image_counter while True: yield await asyncio.wait_for(file.q.get(), timeout)
[docs] async def get_last_index(self) -> int: return self._get_file().image_counter
[docs] def close_file(self): if self._file: self._file.close() self._file = None