Source code for tiled.client.sync

import itertools
import warnings

import httpx

from ..structures.core import StructureFamily
from ..structures.data_source import DataSource, Management
from .base import BaseClient
from .utils import ClientError


[docs] def copy( source: BaseClient, dest: BaseClient, on_conflict: str = "error", ): """ Copy data from one Tiled instance to another. Parameters ---------- source : tiled node dest : tiled node on_conflict : str, default 'error', other options 'warn', 'skip' Examples -------- Connect to two instances and copy data. >>> from tiled.client import from_uri >>> from tiled.client.sync import copy >>> a = from_uri("http://localhost:8000", api_key="secret") >>> b = from_uri("http://localhost:9000", api_key="secret") >>> copy(a, b) Copy select data. >>> copy(a.items().head(), b) >>> copy(a.search(...), b) Copy and ignore duplicates. >>> copy(a, b, on_conflict = 'skip') """ if hasattr(source, "structure_family"): # looks like a client object _DISPATCH[source.structure_family]( source.include_data_sources(), dest, on_conflict ) else: _DISPATCH[StructureFamily.container](dict(source), dest, on_conflict)
def _copy_array(source, dest, on_conflict): num_blocks = (range(len(n)) for n in source.chunks) # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... for block in itertools.product(*num_blocks): array = source.read_block(block) dest.write_block(array, block) def _copy_awkward(source, dest, on_conflict): import awkward array = source.read() _form, _length, container = awkward.to_buffers(array) dest.write(container) def _copy_sparse(source, dest, on_conflict): num_blocks = (range(len(n)) for n in source.chunks) # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... for block in itertools.product(*num_blocks): array = source.read_block(block) dest.write_block(array.coords, array.data, block) def _copy_table(source, dest, on_conflict): for partition in range(source.structure().npartitions): df = source.read_partition(partition) dest.write_partition(partition, df) def _copy_container(source, dest, on_conflict): for key, child_node in source.items(): original_data_sources = child_node.include_data_sources().data_sources() num_data_sources = len(original_data_sources) if num_data_sources == 0: # A container with no data sources is just an organizational # entity in the database. if child_node.structure_family == StructureFamily.container: data_sources = [] else: raise ValueError( f"Unable to copy {child_node} which is a " f"{child_node.structure_family} but has no data sources." ) elif num_data_sources == 1: (original_data_source,) = original_data_sources if original_data_source.management == Management.external: data_sources = [original_data_source] else: if child_node.structure_family == StructureFamily.container: data_sources = [] else: data_sources = [ DataSource( management=original_data_source.management, mimetype=original_data_source.mimetype, structure_family=original_data_source.structure_family, structure=original_data_source.structure, ) ] else: # As of this writing this is impossible, but we anticipate that # it may be added someday. raise NotImplementedError( "Multiple Data Sources in one Node is not supported." ) try: node = dest.new( key=key, structure_family=child_node.structure_family, data_sources=data_sources, metadata=dict(child_node.metadata), specs=child_node.specs, ) except ClientError as err: if ( on_conflict == "skip" or on_conflict == "warn" ) and err.response.status_code == httpx.codes.CONFLICT: if on_conflict == "warn": warnings.warn("Skipped existing entry") continue else: raise err if ( original_data_sources and (original_data_sources[0].management != Management.external) ) or ( (child_node.structure_family == StructureFamily.container) and (not original_data_sources) ): _DISPATCH[child_node.structure_family]( child_node, node, on_conflict=on_conflict ) _DISPATCH = { StructureFamily.array: _copy_array, StructureFamily.awkward: _copy_awkward, StructureFamily.container: _copy_container, StructureFamily.sparse: _copy_sparse, StructureFamily.table: _copy_table, }