Source code for tiled.readers.xarray

import dask.array
import numpy

from ..readers.array import ArrayAdapter
from ..structures.xarray import (
    ArrayStructure,
    DataArrayMacroStructure,
    DataArrayStructure,
    DatasetMacroStructure,
    VariableMacroStructure,
    VariableStructure,
)
from ..utils import DictView


[docs]class VariableAdapter: """ Wrap an xarray.Variable """ structure_family = "variable"
[docs] def __init__(self, variable, metadata=None): self._variable = variable self._metadata = metadata or {}
def __repr__(self): return f"{type(self).__name__}({self._variable!r})" @property def metadata(self): return DictView(self._metadata) def macrostructure(self): # Coordinates are greedily loaded into numpy, so we cannot insist or # assume that these are dask-backed the way that we do in the other # adapters. if isinstance(self._variable.data, dask.array.Array): array_reader = ArrayAdapter(self._variable.data) else: array_reader = ArrayAdapter.from_array(self._variable.data) return VariableMacroStructure( dims=self._variable.dims, data=ArrayStructure( macro=array_reader.macrostructure(), micro=array_reader.microstructure() ), attrs=self._variable.attrs, ) def microstructure(self): return None def read(self): return self._variable def read_block(self, block, slice=None): data = self._variable.data if isinstance(data, numpy.ndarray): if block != (0,): raise NotImplementedError return data dask_array = data.blocks[block] if slice is not None: dask_array = dask_array[slice] return dask_array.compute()
[docs]class DataArrayAdapter: """ Wrap an xarray.DataArray """ structure_family = "data_array"
[docs] def __init__(self, data_array, metadata=None): self._data_array = data_array self._metadata = metadata or {}
def __repr__(self): return f"{type(self).__name__}({self._data_array!r})" @property def metadata(self): return DictView(self._metadata) def macrostructure(self): return self._recursive_macrostructure() def _recursive_macrostructure(self, depth=0): variable_reader = VariableAdapter(self._data_array.variable) variable_structure = VariableStructure( macro=variable_reader.macrostructure(), micro=None ) coords = {} if depth == 0: for k, v in self._data_array.coords.items(): coord_reader = DataArrayAdapter(v) coord_structure = DataArrayStructure( macro=coord_reader._recursive_macrostructure(depth=1 + depth), micro=None, ) coords[k] = coord_structure return DataArrayMacroStructure( variable=variable_structure, coords=coords, name=self._data_array.name ) def microstructure(self): return None def read(self, slice=None): data_array = self._data_array if slice is not None: data_array = data_array[slice] return data_array.compute() def read_block(self, block, coord=None, slice=None): if coord is None: adapter = VariableAdapter(self._data_array.variable) else: adapter = DataArrayAdapter(self._data_array.coords[coord]) return adapter.read_block(block, slice=slice)
[docs]class DatasetAdapter: """ Wrap an xarray.Dataset """ structure_family = "dataset"
[docs] def __init__(self, dataset, metadata=None): self._dataset = dataset self._metadata = metadata or {}
def __repr__(self): return f"{type(self).__name__}({self._dataset!r})" @property def metadata(self): return DictView(self._metadata) def macrostructure(self): data_vars = {} for k, v in self._dataset.data_vars.items(): data_array_reader = DataArrayAdapter(v) data_array_structure = DataArrayStructure( macro=data_array_reader.macrostructure(), micro=None ) data_vars[k] = data_array_structure coords = {} for k, v in self._dataset.coords.items(): coord_reader = DataArrayAdapter(v) coord_structure = DataArrayStructure( macro=coord_reader.macrostructure(), micro=None ) coords[k] = coord_structure return DatasetMacroStructure( data_vars=data_vars, coords=coords, attrs=self._dataset.attrs ) def microstructure(self): return None def read(self, variables=None): ds = self._dataset if variables is not None: ds = ds[variables] return ds.compute() def read_variable(self, variable): return self._dataset[variable].compute() def read_block(self, variable, block, coord=None, slice=None): if variable is None: return DataArrayAdapter(self._dataset.coords[coord]).read_block( block, slice=slice ) else: return DataArrayAdapter(self._dataset[variable]).read_block( block, coord, slice=slice )