import collections
import collections.abc
import urllib.parse
import httpx
from ..utils import import_object, prepend_to_sys_path
from .context import (
DEFAULT_TOKEN_CACHE,
Context,
PromptForReauthentication,
context_from_tree,
)
from .node import Node
from .utils import DEFAULT_ACCEPTED_ENCODINGS, EVENT_HOOKS
[docs]def from_uri(
uri,
structure_clients="numpy",
*,
cache=None,
offline=False,
username=None,
auth_provider=None,
api_key=None,
token_cache=DEFAULT_TOKEN_CACHE,
verify=True,
prompt_for_reauthentication=PromptForReauthentication.AT_INIT,
headers=None,
):
"""
Connect to a Node on a local or remote server.
Parameters
----------
uri : str
e.g. "http://localhost:8000/api"
structure_clients : str or dict, optional
Use "dask" for delayed data loading and "numpy" for immediate
in-memory structures (e.g. normal numpy arrays, pandas
DataFrames). For advanced use, provide dict mapping a
structure_family or a spec to a client object.
cache : Cache, optional
offline : bool, optional
False by default. If True, rely on cache only.
username : str, optional
Username for authenticated access.
auth_provider : str, optional
Name of an authentication provider. If None and the server supports
multiple provides, the user will be interactively prompted to
choose from a list.
api_key : str, optional
API key based authentication. Cannot mix with username/auth_provider.
token_cache : str, optional
Path to directory for storing refresh tokens.
verify : bool, optional
Verify SSL certifications. True by default. False is insecure,
intended for development and testing only.
prompt_for_reauthentication : {"at_init", "always", "never"}
headers : dict, optional
Extra HTTP headers.
"""
# The uri is expected to reach the root or /node/metadata/[...] route.
url = httpx.URL(uri)
headers = headers or {}
headers.setdefault("accept-encoding", ",".join(DEFAULT_ACCEPTED_ENCODINGS))
params = {}
# If ?api_key=... is present, move it from the query into a header.
# The server would accept it in the query parameter, but using
# a header is a little more secure (e.g. not logged) and makes
# it is simpler to manage the client.base_url.
parsed_query = urllib.parse.parse_qs(url.query.decode())
api_key_list = parsed_query.pop("api_key", None)
if api_key_list is not None:
if len(api_key_list) != 1:
raise ValueError("Cannot handle two api_key query parameters")
(api_key,) = api_key_list
headers["X-TILED-API-KEY"] = api_key
params.update(urllib.parse.urlencode(parsed_query, doseq=True))
# Construct the URL *without* the params, which we will pass in separately.
base_uri = urllib.parse.urlunsplit(
(url.scheme, url.netloc.decode(), url.path, {}, url.fragment)
)
client = httpx.Client(
base_url=base_uri,
verify=verify,
event_hooks=EVENT_HOOKS,
timeout=httpx.Timeout(5.0, read=20.0),
headers=headers,
params=params,
)
context = Context(
client,
username=username,
auth_provider=auth_provider,
api_key=api_key,
cache=cache,
offline=offline,
token_cache=token_cache,
prompt_for_reauthentication=prompt_for_reauthentication,
)
return from_context(context, structure_clients=structure_clients)
[docs]def from_tree(
tree,
structure_clients="numpy",
*,
authentication=None,
server_settings=None,
query_registry=None,
serialization_registry=None,
compression_registry=None,
cache=None,
offline=False,
username=None,
auth_provider=None,
api_key=None,
token_cache=DEFAULT_TOKEN_CACHE,
headers=None,
):
"""
Connect to a Node directly, running the app in this same process.
NOTE: This is experimental. It may need to be re-designed or even removed.
In this configuration, we are using the server, but we are communicating
with it directly within this process, not over a local network. It is
generally faster.
Specifically, we are using HTTP over ASGI rather than HTTP over TCP.
There are no sockets or network-related syscalls.
Parameters
----------
tree : Node
structure_clients : str or dict, optional
Use "dask" for delayed data loading and "numpy" for immediate
in-memory structures (e.g. normal numpy arrays, pandas
DataFrames). For advanced use, provide dict mapping a
structure_family or a spec to a client object.
authentication : dict, optional
Dict of authentication configuration.
username : str, optional
Username for authenticated access.
auth_provider : str, optional
Name of an authentication provider. If None and the server supports
multiple provides, the user will be interactively prompted to
choose from a list.
api_key : str, optional
API key based authentication. Cannot mix with username/auth_provider.
cache : Cache, optional
offline : bool, optional
False by default. If True, rely on cache only.
token_cache : str, optional
Path to directory for storing refresh tokens.
prompt_for_reauthentication : {"at_init", "always", "never"}
"""
context = context_from_tree(
tree=tree,
authentication=authentication,
server_settings=server_settings,
query_registry=query_registry,
serialization_registry=serialization_registry,
compression_registry=compression_registry,
# The cache and "offline" mode do not make much sense when we have an
# in-process connection, but we support it for the sake of testing and
# making direct access a drop in replacement for the normal service.
cache=cache,
offline=offline,
token_cache=token_cache,
username=username,
auth_provider=auth_provider,
api_key=api_key,
headers=headers,
)
return from_context(context, structure_clients=structure_clients)
[docs]def from_context(context, structure_clients="numpy", *, path=None):
"""
Advanced: Connect to a Node using a custom instance of httpx.Client or httpx.AsyncClient.
Parameters
----------
context : tiled.client.context.Context
structure_clients : str or dict, optional
Use "dask" for delayed data loading and "numpy" for immediate
in-memory structures (e.g. normal numpy arrays, pandas
DataFrames). For advanced use, provide dict mapping a
structure_family or a spec to a client object.
"""
# Do entrypoint discovery if it hasn't yet been done.
if Node.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is None:
Node.discover_clients_from_entrypoints()
# Interpret structure_clients="numpy" and structure_clients="dask" shortcuts.
if isinstance(structure_clients, str):
structure_clients = Node.DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
path = path or []
content = context.get_json(f"/node/metadata/{'/'.join(context.path_parts)}")
item = content["data"]
instance = Node(
context,
item=item,
path=path,
structure_clients=structure_clients,
)
return instance.client_for_item(item, path=path)
[docs]def from_profile(name, structure_clients=None, **kwargs):
"""
Build a Node based a 'profile' (a named configuration).
List available profiles and the source filepaths from Python like:
>>> from tiled.profiles import list_profiles
>>> list_profiles()
or from a CLI like:
$ tiled profile list
Or show the file contents like:
>>> from tiled.profiles import load_profiles
>>> load_profiles()
or from a CLI like:
$ tiled profile show PROFILE_NAME
Any additional parameters override profile content. See from_uri for details.
"""
# We accept structure_clients as a separate parameter so that it
# may be invoked positionally, as in from_profile("...", "dask").
from ..profiles import ProfileNotFound, load_profiles, paths
profiles = load_profiles()
try:
filepath, profile_content = profiles[name]
except KeyError as err:
raise ProfileNotFound(
f"Profile {name!r} not found. Found profiles {list(profiles)} "
f"from directories {paths}."
) from err
merged = {**profile_content, **kwargs}
if structure_clients is not None:
merged["structure_clients"] = structure_clients
cache_config = merged.pop("cache", None)
if cache_config is not None:
from tiled.client.cache import Cache
if isinstance(cache_config, collections.abc.Mapping):
# All necessary validation has already been performed
# in load_profiles().
((key, value),) = cache_config.items()
# For back-compat, rename "available_bytes" to "capacity".
available_bytes = value.pop("available_bytes", None)
if available_bytes:
if "capacity" in value:
raise ValueError(
"Cannot specific both 'capacity' and its deprecated alias 'available_bytes'."
)
value["capacity"] = available_bytes
import warnings
warnings.warn(
"Profile specifies 'available_bytes'. Use new name 'capacity' instead. "
"Support for the old name, 'available_bytes', will be removed in the future."
)
if key == "memory":
cache = Cache.in_memory(**value)
elif key == "disk":
cache = Cache.on_disk(**value)
else:
# Interpret this as a Cache object passed in directly.
cache = cache_config
merged["cache"] = cache
# Below, we may convert importable strings like
# "package.module:obj" to live objects. Include the profile's
# source directory in the import path, temporarily.
with prepend_to_sys_path(filepath.parent):
structure_clients_ = merged.pop("structure_clients", None)
if structure_clients_ is not None:
if isinstance(structure_clients_, str):
# Nothing to do.
merged["structure_clients"] = structure_clients_
else:
# This is a dict mapping structure families like "array" and "dataframe"
# to values. The values may be client objects or importable strings.
result = {}
for key, value in structure_clients_.items():
if isinstance(value, str):
class_ = import_object(value, accept_live_object=True)
else:
class_ = value
result[key] = class_
merged["structure_clients"] = result
if "direct" in merged:
# The profiles specifies that there is no server. We should create
# an app ourselves and use it directly via ASGI.
from ..config import construct_build_app_kwargs
build_app_kwargs = construct_build_app_kwargs(
merged.pop("direct", None), source_filepath=filepath
)
return from_tree(**build_app_kwargs, **merged)
else:
return from_uri(**merged)
[docs]def from_config(
config,
*,
username=None,
auth_provider=None,
api_key=None,
cache=None,
offline=False,
token_cache=DEFAULT_TOKEN_CACHE,
prompt_for_reauthentication=PromptForReauthentication.AT_INIT,
**kwargs,
):
"""
Build Nodes directly, running the app in this same process.
NOTE: This is experimental. It may need to be re-designed or even removed.
Parameters
----------
config : str or dict
May be:
* Path to config file
* Path to directory of config files
* Dict of config
Examples
--------
From config file:
>>> from_config("path/to/file.yml")
From directory of config files:
>>> from_config("path/to/directory")
From configuration given directly, as dict:
>>> from_config(
{
"trees":
[
"path": "/",
"tree": "tiled.files.Node.from_files",
"args": {"diretory": "path/to/files"}
]
}
)
"""
from ..config import construct_build_app_kwargs
build_app_kwargs = construct_build_app_kwargs(config)
context = context_from_tree(
username=username,
auth_provider=auth_provider,
api_key=api_key,
cache=cache,
offline=offline,
token_cache=token_cache,
prompt_for_reauthentication=prompt_for_reauthentication,
**build_app_kwargs,
)
return from_context(context, **kwargs)