Source code for ophyd_async.core._providers

import re
import uuid
from abc import abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date
from pathlib import Path, PurePath, PureWindowsPath
from typing import Protocol
from urllib.parse import urlunparse


[docs] @dataclass class PathInfo: """Information about where and how to write a file. :param directory_path: Directory into which files should be written :param filename: Base filename to use generated by FilenameProvider, w/o extension :param create_dir_depth: Optional depth of directories to create if they do not exist :param directory_uri: Optional URI to use for reading back resources. If not set, it will be generated from the directory path. """ directory_path: PurePath filename: str create_dir_depth: int = 0 directory_uri: str | None = None def __post_init__(self): if not self.directory_path.is_absolute(): raise ValueError( f"directory_path must be an absolute path, got {self.directory_path}" ) # If directory uri is not set, set it using the directory path. if self.directory_uri is None: self.directory_uri = urlunparse( ( "file", "localhost", f"{self.directory_path.as_posix()}/", "", "", None, ) ) elif not self.directory_uri.endswith("/"): # Ensure the directory URI ends with a slash. self.directory_uri += "/"
[docs] class FilenameProvider(Protocol): """Base class for callable classes providing filenames.""" @abstractmethod def __call__(self, device_name: str | None = None) -> str: """Get a filename to use for output data, w/o extension."""
[docs] class PathProvider(Protocol): """Abstract class that tells a detector where to write its data.""" @abstractmethod def __call__(self, device_name: str | None = None) -> PathInfo: """Get the current directory to write files into."""
[docs] class StaticFilenameProvider(FilenameProvider): """Provides a constant filename on every call.""" def __init__(self, filename: str): self._static_filename = filename def __call__(self, device_name: str | None = None) -> str: return self._static_filename
[docs] class UUIDFilenameProvider(FilenameProvider): """Files will have a UUID as a filename.""" def __init__( self, uuid_call_func: Callable = uuid.uuid4, uuid_call_args: list | None = None, ): self._uuid_call_func = uuid_call_func self._uuid_call_args = uuid_call_args or [] def __call__(self, device_name: str | None = None) -> str: if ( self._uuid_call_func in [uuid.uuid3, uuid.uuid5] and len(self._uuid_call_args) < 2 ): raise ValueError( f"To use {self._uuid_call_func} to generate UUID filenames," " UUID namespace and name must be passed as args!" ) uuid_str = self._uuid_call_func(*self._uuid_call_args) return f"{uuid_str}"
[docs] class AutoIncrementFilenameProvider(FilenameProvider): """Provides a new numerically incremented filename on each call.""" def __init__( self, base_filename: str = "", max_digits: int = 5, starting_value: int = 0, increment: int = 1, inc_delimeter: str = "_", ): self._base_filename = base_filename self._max_digits = max_digits self._current_value = starting_value self._increment = increment self._inc_delimeter = inc_delimeter def __call__(self, device_name: str | None = None) -> str: if len(str(self._current_value)) > self._max_digits: raise ValueError( f"Auto incrementing filename counter \ exceeded maximum of {self._max_digits} digits!" ) padded_counter = f"{self._current_value:0{self._max_digits}}" filename = f"{self._base_filename}{self._inc_delimeter}{padded_counter}" self._current_value += self._increment return filename
[docs] class StaticPathProvider(PathProvider): """All files will be within a static directory.""" def __init__( self, filename_provider: FilenameProvider, directory_path: PurePath, directory_uri: str | None = None, create_dir_depth: int = 0, ) -> None: self._filename_provider = filename_provider self._directory_path = directory_path self._directory_uri = directory_uri self._create_dir_depth = create_dir_depth def __call__(self, device_name: str | None = None) -> PathInfo: filename = self._filename_provider(device_name) return PathInfo( directory_path=self._directory_path, directory_uri=self._directory_uri, filename=filename, create_dir_depth=self._create_dir_depth, )
[docs] class AutoMaxIncrementingPathProvider(PathProvider): """Increment directory name on each call by checking existing directories. Looks through directories in a specified base path to increment directory name. PathInfo gives path like base_path/0001_dirname/dirname, or base_path/yyyy-mm-dd/0001_dirname/dirname if dated is true. Here, the '0001' is the value which gets incremented if the file exists already. It's recommended for the base path provider to be non-incrementing. Args: base_path_provider: Path to create directories inside of. Note that the filename of this provider is used as the top level directory and the filename max_digits: Number of digits to pad onto the parent directory. starting_value: Number to start incrementing from. dated: Whether to create an extra directory to specify the day. """ def __init__( self, base_path_provider: PathProvider, max_digits: int = 4, starting_value: int = 0, dated: bool = False, ): self._base_path_provider = base_path_provider self._max_digits = max_digits self._next_value = starting_value self._dated = dated def _get_highest_number_from(self, path: Path) -> int: # Look through directories in path which end in "_{number} and get highest # number" highest_number = 0 candidates = [ x for x in path.iterdir() if x.is_dir() and re.match(r"^\d+_", x.name) ] if candidates: highest_number = max( int(x.name.split("_", maxsplit=1)[0]) for x in candidates ) else: highest_number = self._next_value return highest_number def __call__(self, device_name: str | None = None) -> PathInfo: base_path_info = self._base_path_provider.__call__() base_path_dir = Path(base_path_info.directory_path) if self._dated: # Make sure we are the max ID of any other days to keep numbering # consistent. cands = [ self._get_highest_number_from(x) for x in base_path_dir.iterdir() if re.match(r"^\d\d\d\d-\d\d-\d\d$", x.name) ] if cands: self._next_value = max(max(cands) + 1, self._next_value) path = base_path_dir / date.today().strftime("%Y-%m-%d") else: path = base_path_dir val_to_use = self._next_value # Get the highest number using files in path, or use # stored next value if no files are found. if path.exists(): highest_number = self._get_highest_number_from(path) val_to_use = ( highest_number + 1 if not highest_number == self._next_value else highest_number ) self._next_value = val_to_use + 1 filename = base_path_info.filename padded_counter = f"{val_to_use:0{self._max_digits}}" full_path = ( path / f"{padded_counter}_{filename.strip('_')}" / filename.rstrip("_") ) return PathInfo( directory_path=full_path.parent, directory_uri=None, filename=full_path.name, create_dir_depth=0, )
[docs] class AutoIncrementingPathProvider(PathProvider): """Provides a new numerically incremented path on each call.""" def __init__( self, filename_provider: FilenameProvider, base_directory_path: PurePath, base_directory_uri: str | None = None, create_dir_depth: int = 0, max_digits: int = 5, starting_value: int = 0, num_calls_per_inc: int = 1, increment: int = 1, inc_delimeter: str = "_", base_name: str | None = None, ) -> None: self._filename_provider = filename_provider self._base_directory_path = base_directory_path self._base_directory_uri = base_directory_uri if ( self._base_directory_uri is not None and not self._base_directory_uri.endswith("/") ): self._base_directory_uri += "/" self._create_dir_depth = create_dir_depth self._base_name = base_name self._starting_value = starting_value self._current_value = starting_value self._num_calls_per_inc = num_calls_per_inc self._inc_counter = 0 self._max_digits = max_digits self._increment = increment self._inc_delimeter = inc_delimeter def __call__(self, device_name: str | None = None) -> PathInfo: filename = self._filename_provider(device_name) padded_counter = f"{self._current_value:0{self._max_digits}}" auto_inc_dir_name = str(padded_counter) if self._base_name is not None: auto_inc_dir_name = ( f"{self._base_name}{self._inc_delimeter}{padded_counter}" ) elif device_name is not None: auto_inc_dir_name = f"{device_name}{self._inc_delimeter}{padded_counter}" self._inc_counter += 1 if self._inc_counter == self._num_calls_per_inc: self._inc_counter = 0 self._current_value += self._increment directory_uri = None if self._base_directory_uri is not None: directory_uri = f"{self._base_directory_uri}{auto_inc_dir_name}" return PathInfo( directory_path=self._base_directory_path / auto_inc_dir_name, directory_uri=directory_uri, filename=filename, create_dir_depth=self._create_dir_depth, )
[docs] class YMDPathProvider(PathProvider): """Provides a path with the date included in the directory name.""" def __init__( self, filename_provider: FilenameProvider, base_directory_path: PurePath, base_directory_uri: str | None = None, create_dir_depth: int = -3, # Default to -3 to create YMD dirs device_name_as_base_dir: bool = False, ) -> None: self._filename_provider = filename_provider self._base_directory_path = base_directory_path self._base_directory_uri = base_directory_uri if ( self._base_directory_uri is not None and not self._base_directory_uri.endswith("/") ): self._base_directory_uri += "/" self._create_dir_depth = create_dir_depth self._device_name_as_base_dir = device_name_as_base_dir def __call__(self, device_name: str | None = None) -> PathInfo: path_type = type(self._base_directory_path) if path_type == PureWindowsPath: sep = "\\" else: sep = "/" current_date = date.today().strftime(f"%Y{sep}%m{sep}%d") if device_name is None: ymd_dir_path = current_date elif self._device_name_as_base_dir: ymd_dir_path = path_type( current_date, device_name, ) else: ymd_dir_path = path_type( device_name, current_date, ) filename = self._filename_provider(device_name) directory_uri = None if self._base_directory_uri is not None: directory_uri = f"{self._base_directory_uri}{ymd_dir_path}" return PathInfo( directory_path=self._base_directory_path / ymd_dir_path, directory_uri=directory_uri, filename=filename, create_dir_depth=self._create_dir_depth, )
[docs] class DatasetDescriber(Protocol): """For describing datasets in file writing."""
[docs] @abstractmethod async def np_datatype(self) -> str: """Return the numpy datatype for this dataset."""
[docs] @abstractmethod async def shape(self) -> tuple[int | None, ...]: """Get the shape of the data collection."""