Source code for tiled.adapters.zarr

import builtins
import collections.abc
import os
import sys
from typing import Any, Iterator, List, Optional, Tuple, Union

import zarr.core
import zarr.hierarchy
import zarr.storage
from numpy._typing import NDArray

from ..adapters.utils import IndexersMixin
from ..catalog.orm import Node
from ..iterviews import ItemsView, KeysView, ValuesView
from ..server.schemas import Asset
from ..structures.array import ArrayStructure
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import DataSource
from ..type_aliases import JSON, NDSlice
from ..utils import Conflicts, node_repr, path_from_uri
from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks

INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7"))


[docs] class ZarrArrayAdapter(ArrayAdapter): """ """ @classmethod def init_storage(cls, data_uri: str, structure: ArrayStructure) -> List[Asset]: """ Parameters ---------- data_uri : structure : Returns ------- """ # Zarr requires evenly-sized chunks within each dimension. # Use the first chunk along each dimension. zarr_chunks = tuple(dim[0] for dim in structure.chunks) shape = tuple(dim[0] * len(dim) for dim in structure.chunks) directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) storage = zarr.storage.DirectoryStore(str(directory)) zarr.storage.init_array( storage, shape=shape, chunks=zarr_chunks, dtype=structure.data_type.to_numpy_dtype(), ) return [ Asset( data_uri=data_uri, is_directory=True, parameter="data_uri", ) ] def _stencil(self) -> Tuple[slice, ...]: """Trim overflow because Zarr always has equal-sized chunks.""" return tuple(builtins.slice(0, dim) for dim in self.structure().shape) def read( self, slice: NDSlice = ..., ) -> NDArray[Any]: """ Parameters ---------- slice : Returns ------- """ return self._array[self._stencil()][slice] def read_block( self, block: Tuple[int, ...], slice: NDSlice = ..., ) -> NDArray[Any]: """ Parameters ---------- block : slice : Returns ------- """ block_slice, _ = slice_and_shape_from_block_and_chunks( block, self.structure().chunks ) # Slice the block out of the whole array, # and optionally a sub-slice therein. return self._array[self._stencil()][block_slice][slice] def write( self, data: NDArray[Any], slice: NDSlice = ..., ) -> None: """ Parameters ---------- data : slice : Returns ------- """ if slice is not ...: raise NotImplementedError self._array[self._stencil()] = data async def write_block( self, data: NDArray[Any], block: Tuple[int, ...], ) -> None: """ Parameters ---------- data : block : slice : Returns ------- """ block_slice, shape = slice_and_shape_from_block_and_chunks( block, self.structure().chunks ) self._array[block_slice] = data async def patch( self, data: NDArray[Any], offset: Tuple[int, ...], extend: bool = False, ) -> Tuple[Tuple[int, ...], Tuple[Tuple[int, ...], ...]]: """ Write data into a slice of the array, maybe extending it. If the specified slice does not fit into the array, and extend=True, the array will be resized (expanded, never shrunk) to fit it. Parameters ---------- data : array-like offset : tuple[int] Where to place the new data extend : bool If slice does not fit wholly within the shape of the existing array, reshape (expand) it to fit if this is True. Raises ------ ValueError : If slice does not fit wholly with the shape of the existing array and expand is False """ current_shape = self._array.shape normalized_offset = [0] * len(current_shape) normalized_offset[: len(offset)] = list(offset) new_shape = [] slice_ = [] for data_dim, offset_dim, current_dim in zip( data.shape, normalized_offset, current_shape ): new_shape.append(max(current_dim, data_dim + offset_dim)) slice_.append(slice(offset_dim, offset_dim + data_dim)) new_shape_tuple = tuple(new_shape) if new_shape_tuple != current_shape: if extend: # Resize the Zarr array to accommodate new data self._array.resize(new_shape_tuple) else: raise Conflicts( f"Slice {slice} does not fit into array shape {current_shape}. " "Use ?extend=true to extend array dimension to fit." ) self._array[tuple(slice_)] = data new_chunks = [] # Zarr has regularly-sized chunks, so no user input is required to # simply extend the existing pattern. for chunk_size, size in zip(self._array.chunks, new_shape_tuple): dim = [chunk_size] * (size // chunk_size) if size % chunk_size: dim.append(size % chunk_size) new_chunks.append(tuple(dim)) new_chunks_tuple = tuple(new_chunks) return new_shape_tuple, new_chunks_tuple
if sys.version_info < (3, 9): from typing_extensions import Mapping MappingType = Mapping else: import collections MappingType = collections.abc.Mapping
[docs] class ZarrGroupAdapter( MappingType[str, Union["ArrayAdapter", "ZarrGroupAdapter"]], IndexersMixin, ): """ """ structure_family = StructureFamily.container
[docs] def __init__( self, node: Any, *, structure: Optional[ArrayStructure] = None, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, ) -> None: """ Parameters ---------- node : structure : metadata : specs : """ if structure is not None: raise ValueError( f"structure is expected to be None for containers, not {structure}" ) self._node = node self.specs = specs or [] self._provided_metadata = metadata or {} super().__init__()
def __repr__(self) -> str: return node_repr(self, list(self)) def metadata(self) -> Any: return self._node.attrs def structure(self) -> None: return None def __iter__(self) -> Iterator[Any]: yield from self._node def __getitem__(self, key: str) -> Union[ArrayAdapter, "ZarrGroupAdapter"]: value = self._node[key] if isinstance(value, zarr.hierarchy.Group): return ZarrGroupAdapter(value) else: return ZarrArrayAdapter.from_array(value) def __len__(self) -> int: return len(self._node) def keys(self) -> KeysView: # type: ignore return KeysView(lambda: len(self), self._keys_slice) def values(self) -> ValuesView: # type: ignore return ValuesView(lambda: len(self), self._items_slice) def items(self) -> ItemsView: # type: ignore return ItemsView(lambda: len(self), self._items_slice) def search(self, query: Any) -> None: """ Parameters ---------- query : Returns ------- A Tree with a subset of the mapping. """ raise NotImplementedError def read(self, fields: Optional[str]) -> "ZarrGroupAdapter": """ Parameters ---------- fields : Returns ------- """ if fields is not None: raise NotImplementedError return self # The following two methods are used by keys(), values(), items(). def _keys_slice(self, start: int, stop: int, direction: int) -> List[Any]: """ Parameters ---------- start : stop : direction : Returns ------- """ keys = list(self._node) if direction < 0: keys = list(reversed(keys)) return keys[start:stop] def _items_slice(self, start: int, stop: int, direction: int) -> List[Any]: """ Parameters ---------- start : stop : direction : Returns ------- """ items = [(key, self[key]) for key in list(self)] if direction < 0: items = list(reversed(items)) return items[start:stop] def inlined_contents_enabled(self, depth: int) -> bool: return depth <= INLINED_DEPTH
class ZarrAdapter: @classmethod def from_catalog( cls, data_source: DataSource, node: Node, /, **kwargs: Optional[Any], ) -> Union[ZarrGroupAdapter, ArrayAdapter]: zarr_obj = zarr.open( path_from_uri(data_source.assets[0].data_uri) ) # Group or Array if node.structure_family == StructureFamily.container: return ZarrGroupAdapter( zarr_obj, structure=data_source.structure, metadata=node.metadata_, specs=node.specs, **kwargs, ) else: return ZarrArrayAdapter( zarr_obj, structure=data_source.structure, metadata=node.metadata_, specs=node.specs, **kwargs, ) @classmethod def from_uris( cls, data_uri: str, **kwargs: Optional[Any] ) -> Union[ZarrArrayAdapter, ZarrGroupAdapter]: zarr_obj = zarr.open(path_from_uri(data_uri)) # Group or Array if isinstance(zarr_obj, zarr.hierarchy.Group): return ZarrGroupAdapter(zarr_obj, **kwargs) else: structure = ArrayStructure.from_array(zarr_obj) return ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs)