import re
import uuid
from abc import abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from datetime import date
from pathlib import Path, PurePath, PureWindowsPath
from typing import Protocol
from urllib.parse import urlunparse
[docs]
@dataclass
class PathInfo:
"""Information about where and how to write a file.
:param directory_path: Directory into which files should be written
:param filename: Base filename to use generated by FilenameProvider, w/o extension
:param create_dir_depth: Optional depth of directories to create if they do not
exist
:param directory_uri: Optional URI to use for reading back resources. If not set,
it will be generated from the directory path.
"""
directory_path: PurePath
filename: str
create_dir_depth: int = 0
directory_uri: str | None = None
def __post_init__(self):
if not self.directory_path.is_absolute():
raise ValueError(
f"directory_path must be an absolute path, got {self.directory_path}"
)
# If directory uri is not set, set it using the directory path.
if self.directory_uri is None:
self.directory_uri = urlunparse(
(
"file",
"localhost",
f"{self.directory_path.as_posix()}/",
"",
"",
None,
)
)
elif not self.directory_uri.endswith("/"):
# Ensure the directory URI ends with a slash.
self.directory_uri += "/"
[docs]
class FilenameProvider(Protocol):
"""Base class for callable classes providing filenames."""
@abstractmethod
def __call__(self, device_name: str | None = None) -> str:
"""Get a filename to use for output data, w/o extension."""
[docs]
class PathProvider(Protocol):
"""Abstract class that tells a detector where to write its data."""
@abstractmethod
def __call__(self, device_name: str | None = None) -> PathInfo:
"""Get the current directory to write files into."""
[docs]
class StaticFilenameProvider(FilenameProvider):
"""Provides a constant filename on every call."""
def __init__(self, filename: str):
self._static_filename = filename
def __call__(self, device_name: str | None = None) -> str:
return self._static_filename
[docs]
class UUIDFilenameProvider(FilenameProvider):
"""Files will have a UUID as a filename."""
def __init__(
self,
uuid_call_func: Callable = uuid.uuid4,
uuid_call_args: list | None = None,
):
self._uuid_call_func = uuid_call_func
self._uuid_call_args = uuid_call_args or []
def __call__(self, device_name: str | None = None) -> str:
if (
self._uuid_call_func in [uuid.uuid3, uuid.uuid5]
and len(self._uuid_call_args) < 2
):
raise ValueError(
f"To use {self._uuid_call_func} to generate UUID filenames,"
" UUID namespace and name must be passed as args!"
)
uuid_str = self._uuid_call_func(*self._uuid_call_args)
return f"{uuid_str}"
[docs]
class AutoIncrementFilenameProvider(FilenameProvider):
"""Provides a new numerically incremented filename on each call."""
def __init__(
self,
base_filename: str = "",
max_digits: int = 5,
starting_value: int = 0,
increment: int = 1,
inc_delimeter: str = "_",
):
self._base_filename = base_filename
self._max_digits = max_digits
self._current_value = starting_value
self._increment = increment
self._inc_delimeter = inc_delimeter
def __call__(self, device_name: str | None = None) -> str:
if len(str(self._current_value)) > self._max_digits:
raise ValueError(
f"Auto incrementing filename counter \
exceeded maximum of {self._max_digits} digits!"
)
padded_counter = f"{self._current_value:0{self._max_digits}}"
filename = f"{self._base_filename}{self._inc_delimeter}{padded_counter}"
self._current_value += self._increment
return filename
[docs]
class StaticPathProvider(PathProvider):
"""All files will be within a static directory."""
def __init__(
self,
filename_provider: FilenameProvider,
directory_path: PurePath,
directory_uri: str | None = None,
create_dir_depth: int = 0,
) -> None:
self._filename_provider = filename_provider
self._directory_path = directory_path
self._directory_uri = directory_uri
self._create_dir_depth = create_dir_depth
def __call__(self, device_name: str | None = None) -> PathInfo:
filename = self._filename_provider(device_name)
return PathInfo(
directory_path=self._directory_path,
directory_uri=self._directory_uri,
filename=filename,
create_dir_depth=self._create_dir_depth,
)
[docs]
class AutoMaxIncrementingPathProvider(PathProvider):
"""Increment directory name on each call by checking existing directories.
Looks through directories in a specified base path to increment directory name.
PathInfo gives path like base_path/0001_dirname/dirname, or
base_path/yyyy-mm-dd/0001_dirname/dirname if dated is true. Here,
the '0001' is the value which gets incremented if the file exists already.
It's recommended for the base path provider to be non-incrementing.
Args:
base_path_provider: Path to create directories inside of. Note that the filename of
this provider is used as the top level directory and the filename
max_digits: Number of digits to pad onto the parent directory.
starting_value: Number to start incrementing from.
dated: Whether to create an extra directory to specify the day.
"""
def __init__(
self,
base_path_provider: PathProvider,
max_digits: int = 4,
starting_value: int = 0,
dated: bool = False,
):
self._base_path_provider = base_path_provider
self._max_digits = max_digits
self._next_value = starting_value
self._dated = dated
def _get_highest_number_from(self, path: Path) -> int:
# Look through directories in path which end in "_{number} and get highest
# number"
highest_number = 0
candidates = [
x for x in path.iterdir() if x.is_dir() and re.match(r"^\d+_", x.name)
]
if candidates:
highest_number = max(
int(x.name.split("_", maxsplit=1)[0]) for x in candidates
)
else:
highest_number = self._next_value
return highest_number
def __call__(self, device_name: str | None = None) -> PathInfo:
base_path_info = self._base_path_provider.__call__()
base_path_dir = Path(base_path_info.directory_path)
if self._dated:
# Make sure we are the max ID of any other days to keep numbering
# consistent.
cands = [
self._get_highest_number_from(x)
for x in base_path_dir.iterdir()
if re.match(r"^\d\d\d\d-\d\d-\d\d$", x.name)
]
if cands:
self._next_value = max(max(cands) + 1, self._next_value)
path = base_path_dir / date.today().strftime("%Y-%m-%d")
else:
path = base_path_dir
val_to_use = self._next_value
# Get the highest number using files in path, or use
# stored next value if no files are found.
if path.exists():
highest_number = self._get_highest_number_from(path)
val_to_use = (
highest_number + 1
if not highest_number == self._next_value
else highest_number
)
self._next_value = val_to_use + 1
filename = base_path_info.filename
padded_counter = f"{val_to_use:0{self._max_digits}}"
full_path = (
path / f"{padded_counter}_{filename.strip('_')}" / filename.rstrip("_")
)
return PathInfo(
directory_path=full_path.parent,
directory_uri=None,
filename=full_path.name,
create_dir_depth=0,
)
[docs]
class AutoIncrementingPathProvider(PathProvider):
"""Provides a new numerically incremented path on each call."""
def __init__(
self,
filename_provider: FilenameProvider,
base_directory_path: PurePath,
base_directory_uri: str | None = None,
create_dir_depth: int = 0,
max_digits: int = 5,
starting_value: int = 0,
num_calls_per_inc: int = 1,
increment: int = 1,
inc_delimeter: str = "_",
base_name: str | None = None,
) -> None:
self._filename_provider = filename_provider
self._base_directory_path = base_directory_path
self._base_directory_uri = base_directory_uri
if (
self._base_directory_uri is not None
and not self._base_directory_uri.endswith("/")
):
self._base_directory_uri += "/"
self._create_dir_depth = create_dir_depth
self._base_name = base_name
self._starting_value = starting_value
self._current_value = starting_value
self._num_calls_per_inc = num_calls_per_inc
self._inc_counter = 0
self._max_digits = max_digits
self._increment = increment
self._inc_delimeter = inc_delimeter
def __call__(self, device_name: str | None = None) -> PathInfo:
filename = self._filename_provider(device_name)
padded_counter = f"{self._current_value:0{self._max_digits}}"
auto_inc_dir_name = str(padded_counter)
if self._base_name is not None:
auto_inc_dir_name = (
f"{self._base_name}{self._inc_delimeter}{padded_counter}"
)
elif device_name is not None:
auto_inc_dir_name = f"{device_name}{self._inc_delimeter}{padded_counter}"
self._inc_counter += 1
if self._inc_counter == self._num_calls_per_inc:
self._inc_counter = 0
self._current_value += self._increment
directory_uri = None
if self._base_directory_uri is not None:
directory_uri = f"{self._base_directory_uri}{auto_inc_dir_name}"
return PathInfo(
directory_path=self._base_directory_path / auto_inc_dir_name,
directory_uri=directory_uri,
filename=filename,
create_dir_depth=self._create_dir_depth,
)
[docs]
class YMDPathProvider(PathProvider):
"""Provides a path with the date included in the directory name."""
def __init__(
self,
filename_provider: FilenameProvider,
base_directory_path: PurePath,
base_directory_uri: str | None = None,
create_dir_depth: int = -3, # Default to -3 to create YMD dirs
device_name_as_base_dir: bool = False,
) -> None:
self._filename_provider = filename_provider
self._base_directory_path = base_directory_path
self._base_directory_uri = base_directory_uri
if (
self._base_directory_uri is not None
and not self._base_directory_uri.endswith("/")
):
self._base_directory_uri += "/"
self._create_dir_depth = create_dir_depth
self._device_name_as_base_dir = device_name_as_base_dir
def __call__(self, device_name: str | None = None) -> PathInfo:
path_type = type(self._base_directory_path)
if path_type == PureWindowsPath:
sep = "\\"
else:
sep = "/"
current_date = date.today().strftime(f"%Y{sep}%m{sep}%d")
if device_name is None:
ymd_dir_path = current_date
elif self._device_name_as_base_dir:
ymd_dir_path = path_type(
current_date,
device_name,
)
else:
ymd_dir_path = path_type(
device_name,
current_date,
)
filename = self._filename_provider(device_name)
directory_uri = None
if self._base_directory_uri is not None:
directory_uri = f"{self._base_directory_uri}{ymd_dir_path}"
return PathInfo(
directory_path=self._base_directory_path / ymd_dir_path,
directory_uri=directory_uri,
filename=filename,
create_dir_depth=self._create_dir_depth,
)
[docs]
class DatasetDescriber(Protocol):
"""For describing datasets in file writing."""
[docs]
@abstractmethod
async def np_datatype(self) -> str:
"""Return the numpy datatype for this dataset."""
[docs]
@abstractmethod
async def shape(self) -> tuple[int | None, ...]:
"""Get the shape of the data collection."""