Source code for tiled.client.constructors

import collections
import collections.abc

import httpx

from ..utils import import_object, prepend_to_sys_path
from .container import DEFAULT_STRUCTURE_CLIENT_DISPATCH, Container
from .context import DEFAULT_TIMEOUT_PARAMS, UNSET, Context
from .utils import MSGPACK_MIME_TYPE, ClientError, client_for_item, handle_error


[docs]def from_uri( uri, structure_clients="numpy", *, cache=UNSET, username=UNSET, auth_provider=UNSET, api_key=None, verify=True, prompt_for_reauthentication=UNSET, headers=None, timeout=None, include_data_sources=False, ): """ Connect to a Node on a local or remote server. Parameters ---------- uri : str e.g. "http://localhost:8000" structure_clients : str or dict, optional Use "dask" for delayed data loading and "numpy" for immediate in-memory structures (e.g. normal numpy arrays, pandas DataFrames). For advanced use, provide dict mapping a structure_family or a spec to a client object. cache : Cache, optional username : str, optional Username for authenticated access. If UNSET, use default if available (typically, the most recently used). auth_provider : str, optional Name of an authentication provider. IF UNSET, use default if available (typically, the most recently used). If None and the server supports multiple provides, the user will be interactively prompted to choose from a list. api_key : str, optional API key based authentication. Cannot mix with username/auth_provider. verify : bool, optional Verify SSL certifications. True by default. False is insecure, intended for development and testing only. prompt_for_reauthentication : bool, optional If True, prompt interactively for credentials if needed. If False, raise an error. By default, attempt to detect whether terminal is interactive (is a TTY). headers : dict, optional Extra HTTP headers. timeout : httpx.Timeout, optional If None, use Tiled default settings. (To disable timeouts, use httpx.Timeout(None)). include_data_sources : bool, optional Default False. If True, fetch information about underlying data sources. """ context, node_path_parts = Context.from_any_uri( uri, api_key=api_key, cache=cache, headers=headers, timeout=timeout, verify=verify, ) return from_context( context, structure_clients=structure_clients, prompt_for_reauthentication=prompt_for_reauthentication, username=username, auth_provider=auth_provider, node_path_parts=node_path_parts, include_data_sources=include_data_sources, )
def from_context( context, structure_clients="numpy", prompt_for_reauthentication=UNSET, username=UNSET, auth_provider=UNSET, node_path_parts=None, include_data_sources=False, ): """ Advanced: Connect to a Node using a custom instance of httpx.Client or httpx.AsyncClient. Parameters ---------- context : tiled.client.context.Context structure_clients : str or dict, optional Use "dask" for delayed data loading and "numpy" for immediate in-memory structures (e.g. normal numpy arrays, pandas DataFrames). For advanced use, provide dict mapping a structure_family or a spec to a client object. prompt_for_reauthentication : bool, optional If True, prompt interactively for credentials if needed. If False, raise an error. By default, attempt to detect whether terminal is interactive (is a TTY). """ if (username is not UNSET) or (auth_provider is not UNSET): if context.api_key is not None: raise ValueError("Use api_key or username/auth_provider, not both.") node_path_parts = node_path_parts or [] # Do entrypoint discovery if it hasn't yet been done. if Container.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is None: Container.discover_clients_from_entrypoints() # Interpret structure_clients="numpy" and structure_clients="dask" shortcuts. if isinstance(structure_clients, str): structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients] if ( (context.api_key is None) and context.server_info["authentication"]["required"] and (not context.server_info["authentication"]["providers"]) ): raise RuntimeError( """This server requires API key authentication. Set an api_key as in: >>> c = from_uri("...", api_key="...") """ ) if username is not UNSET: context.authenticate( username=username, provider=auth_provider, prompt_for_reauthentication=prompt_for_reauthentication, ) # Context ensures that context.api_uri has a trailing slash. item_uri = f"{context.api_uri}metadata/{'/'.join(node_path_parts)}" try: content = handle_error( context.http_client.get( item_uri, headers={"Accept": MSGPACK_MIME_TYPE}, params={"include_data_sources": include_data_sources}, ) ).json() except ClientError as err: if ( (err.response.status_code == httpx.codes.UNAUTHORIZED) and (context.api_key is None) and (context.http_client.auth is None) ): context.authenticate() params = {} if include_data_sources: params["include_data_sources"] = True content = handle_error( context.http_client.get( item_uri, headers={"Accept": MSGPACK_MIME_TYPE}, params=params, ) ).json() else: raise item = content["data"] return client_for_item( context, structure_clients, item, include_data_sources=include_data_sources )
[docs]def from_profile(name, structure_clients=None, **kwargs): """ Build a Node based a 'profile' (a named configuration). List available profiles and the source filepaths from Python like: >>> from tiled.profiles import list_profiles >>> list_profiles() or from a CLI like: $ tiled profile list Or show the file contents like: >>> from tiled.profiles import load_profiles >>> load_profiles() or from a CLI like: $ tiled profile show PROFILE_NAME Any additional parameters override profile content. See from_uri for details. """ # We accept structure_clients as a separate parameter so that it # may be invoked positionally, as in from_profile("...", "dask"). from ..profiles import ProfileNotFound, load_profiles, paths profiles = load_profiles() try: filepath, profile_content = profiles[name] except KeyError as err: raise ProfileNotFound( f"Profile {name!r} not found. Found profiles {list(profiles)} " f"from directories {paths}." ) from err merged = {**profile_content, **kwargs} if structure_clients is not None: merged["structure_clients"] = structure_clients cache_config = merged.pop("cache", None) if cache_config is not None: from tiled.client.cache import Cache if isinstance(cache_config, collections.abc.Mapping): # All necessary validation has already been performed # in load_profiles(). cache = Cache(**cache_config) else: # Interpret this as a Cache object passed in directly. cache = cache_config merged["cache"] = cache timeout_config = merged.pop("timeout", None) if timeout_config is not None: if isinstance(timeout_config, httpx.Timeout): timeout = timeout_config else: timeout_params = DEFAULT_TIMEOUT_PARAMS.copy() timeout_params.update(timeout_config) timeout = httpx.Timeout(**timeout_params) merged["timeout"] = timeout # Below, we may convert importable strings like # "package.module:obj" to live objects. Include the profile's # source directory in the import path, temporarily. with prepend_to_sys_path(filepath.parent): structure_clients_ = merged.pop("structure_clients", None) if structure_clients_ is not None: if isinstance(structure_clients_, str): # Nothing to do. merged["structure_clients"] = structure_clients_ else: # This is a dict mapping structure families like "array" and "dataframe" # to values. The values may be client objects or importable strings. result = {} for key, value in structure_clients_.items(): if isinstance(value, str): class_ = import_object(value, accept_live_object=True) else: class_ = value result[key] = class_ merged["structure_clients"] = result if "direct" in merged: # The profile specifies the server in-line. # Create an app and use it directly via ASGI. import jsonschema from ..config import ConfigError, schema from ..server.app import build_app_from_config config = merged.pop("direct", None) try: jsonschema.validate(instance=config, schema=schema()) except jsonschema.ValidationError as err: msg = err.args[0] raise ConfigError( f"ValidationError while parsing configuration file {filepath}: {msg}" ) from err context = Context.from_app( build_app_from_config(config, source_filepath=filepath), ) return from_context(context, **merged) else: return from_uri(**merged)