Source code for tiled.adapters.zarr

import builtins
import copy
import os
from collections.abc import Mapping
from typing import Any, Iterator, List, Optional, Tuple, Union, cast
from urllib.parse import quote_plus

import zarr.core
import zarr.hierarchy
import zarr.storage
from numpy._typing import NDArray

from ..adapters.utils import IndexersMixin
from ..catalog.orm import Node
from ..iterviews import ItemsView, KeysView, ValuesView
from ..structures.array import ArrayStructure
from ..structures.core import Spec, StructureFamily
from ..structures.data_source import Asset, DataSource, Storage
from ..type_aliases import JSON, NDSlice
from ..utils import Conflicts, node_repr, path_from_uri
from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks

INLINED_DEPTH = int(os.getenv("TILED_HDF5_INLINED_CONTENTS_MAX_DEPTH", "7"))


[docs] class ZarrArrayAdapter(ArrayAdapter): """ """ @classmethod def init_storage( cls, storage: Storage, data_source: DataSource[ArrayStructure], path_parts: List[str], ) -> DataSource[ArrayStructure]: """ Parameters ---------- data_uri : structure : Returns ------- """ data_source = copy.deepcopy(data_source) # Do not mutate caller input. data_uri = storage.get("filesystem") + "".join( f"/{quote_plus(segment)}" for segment in path_parts ) # Zarr requires evenly-sized chunks within each dimension. # Use the first chunk along each dimension. zarr_chunks = tuple(dim[0] for dim in data_source.structure.chunks) shape = tuple(dim[0] * len(dim) for dim in data_source.structure.chunks) directory = path_from_uri(data_uri) directory.mkdir(parents=True, exist_ok=True) store = zarr.storage.DirectoryStore(str(directory)) zarr.storage.init_array( store, shape=shape, chunks=zarr_chunks, dtype=data_source.structure.data_type.to_numpy_dtype(), ) data_source.assets.append( Asset( data_uri=data_uri, is_directory=True, parameter="data_uri", ) ) return data_source def _stencil(self) -> Tuple[slice, ...]: """Trim overflow because Zarr always has equal-sized chunks.""" return tuple(builtins.slice(0, dim) for dim in self.structure().shape) def read( self, slice: NDSlice = ..., ) -> NDArray[Any]: """ Parameters ---------- slice : Returns ------- """ return self._array[self._stencil()][slice] def read_block( self, block: Tuple[int, ...], slice: NDSlice = ..., ) -> NDArray[Any]: """ Parameters ---------- block : slice : Returns ------- """ block_slice, _ = slice_and_shape_from_block_and_chunks( block, self.structure().chunks ) # Slice the block out of the whole array, # and optionally a sub-slice therein. return self._array[self._stencil()][block_slice][slice] def write( self, data: NDArray[Any], slice: NDSlice = ..., ) -> None: """ Parameters ---------- data : slice : Returns ------- """ if slice is not ...: raise NotImplementedError self._array[self._stencil()] = data async def write_block( self, data: NDArray[Any], block: Tuple[int, ...], ) -> None: """ Parameters ---------- data : block : slice : Returns ------- """ block_slice, shape = slice_and_shape_from_block_and_chunks( block, self.structure().chunks ) self._array[block_slice] = data async def patch( self, data: NDArray[Any], offset: Tuple[int, ...], extend: bool = False, ) -> Tuple[Tuple[int, ...], Tuple[Tuple[int, ...], ...]]: """ Write data into a slice of the array, maybe extending it. If the specified slice does not fit into the array, and extend=True, the array will be resized (expanded, never shrunk) to fit it. Parameters ---------- data : array-like offset : tuple[int] Where to place the new data extend : bool If slice does not fit wholly within the shape of the existing array, reshape (expand) it to fit if this is True. Raises ------ ValueError : If slice does not fit wholly with the shape of the existing array and expand is False """ current_shape = self._array.shape normalized_offset = [0] * len(current_shape) normalized_offset[: len(offset)] = list(offset) new_shape = [] slice_ = [] for data_dim, offset_dim, current_dim in zip( data.shape, normalized_offset, current_shape ): new_shape.append(max(current_dim, data_dim + offset_dim)) slice_.append(slice(offset_dim, offset_dim + data_dim)) new_shape_tuple = tuple(new_shape) if new_shape_tuple != current_shape: if extend: # Resize the Zarr array to accommodate new data self._array.resize(new_shape_tuple) else: raise Conflicts( f"Slice {slice} does not fit into array shape {current_shape}. " "Use ?extend=true to extend array dimension to fit." ) self._array[tuple(slice_)] = data new_chunks = [] # Zarr has regularly-sized chunks, so no user input is required to # simply extend the existing pattern. for chunk_size, size in zip(self._array.chunks, new_shape_tuple): dim = [chunk_size] * (size // chunk_size) if size % chunk_size: dim.append(size % chunk_size) new_chunks.append(tuple(dim)) new_chunks_tuple = tuple(new_chunks) return new_shape_tuple, new_chunks_tuple
[docs] class ZarrGroupAdapter( Mapping[str, Union["ArrayAdapter", "ZarrGroupAdapter"]], IndexersMixin, ): """ """ structure_family = StructureFamily.container
[docs] def __init__( self, node: Any, *, structure: Optional[ArrayStructure] = None, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, ) -> None: """ Parameters ---------- node : structure : metadata : specs : """ if structure is not None: raise ValueError( f"structure is expected to be None for containers, not {structure}" ) self._node = node self.specs = specs or [] self._provided_metadata = metadata or {} super().__init__()
def __repr__(self) -> str: return node_repr(self, list(self)) def metadata(self) -> Any: return self._node.attrs def structure(self) -> None: return None def __iter__(self) -> Iterator[Any]: yield from self._node def __getitem__(self, key: str) -> Union[ArrayAdapter, "ZarrGroupAdapter"]: value = self._node[key] if isinstance(value, zarr.hierarchy.Group): return ZarrGroupAdapter(value) else: return ZarrArrayAdapter.from_array(value) def __len__(self) -> int: return len(self._node) def keys(self) -> KeysView: # type: ignore return KeysView(lambda: len(self), self._keys_slice) def values(self) -> ValuesView: # type: ignore return ValuesView(lambda: len(self), self._items_slice) def items(self) -> ItemsView: # type: ignore return ItemsView(lambda: len(self), self._items_slice) def search(self, query: Any) -> None: """ Parameters ---------- query : Returns ------- A Tree with a subset of the mapping. """ raise NotImplementedError def read(self, fields: Optional[str]) -> "ZarrGroupAdapter": """ Parameters ---------- fields : Returns ------- """ if fields is not None: raise NotImplementedError return self # The following two methods are used by keys(), values(), items(). def _keys_slice(self, start: int, stop: int, direction: int) -> List[Any]: """ Parameters ---------- start : stop : direction : Returns ------- """ keys = list(self._node) if direction < 0: keys = list(reversed(keys)) return keys[start:stop] def _items_slice(self, start: int, stop: int, direction: int) -> List[Any]: """ Parameters ---------- start : stop : direction : Returns ------- """ items = [(key, self[key]) for key in list(self)] if direction < 0: items = list(reversed(items)) return items[start:stop] def inlined_contents_enabled(self, depth: int) -> bool: return depth <= INLINED_DEPTH
class ZarrAdapter: @classmethod def from_catalog( cls, # An Zarr node may reference an array or group (container). data_source: DataSource[Union[ArrayStructure, None]], node: Node, /, **kwargs: Optional[Any], ) -> Union[ZarrGroupAdapter, ArrayAdapter]: zarr_obj = zarr.open( path_from_uri(data_source.assets[0].data_uri) ) # Group or Array if node.structure_family == StructureFamily.container: return ZarrGroupAdapter( zarr_obj, structure=data_source.structure, metadata=node.metadata_, specs=node.specs, **kwargs, ) else: return ZarrArrayAdapter( zarr_obj, structure=cast(ArrayStructure, data_source.structure), metadata=node.metadata_, specs=node.specs, **kwargs, ) @classmethod def from_uris( cls, data_uri: str, **kwargs: Optional[Any] ) -> Union[ZarrArrayAdapter, ZarrGroupAdapter]: zarr_obj = zarr.open(path_from_uri(data_uri)) # Group or Array if isinstance(zarr_obj, zarr.hierarchy.Group): return ZarrGroupAdapter(zarr_obj, **kwargs) else: structure = ArrayStructure.from_array(zarr_obj) return ZarrArrayAdapter(zarr_obj, structure=structure, **kwargs)