import collections
import collections.abc
import warnings
from urllib.parse import parse_qs, urlparse
import httpx
from ..utils import import_object, prepend_to_sys_path
from .container import DEFAULT_STRUCTURE_CLIENT_DISPATCH, Container
from .context import DEFAULT_TIMEOUT_PARAMS, UNSET, Context
from .utils import MSGPACK_MIME_TYPE, client_for_item, handle_error, retry_context
[docs]
def from_uri(
uri,
structure_clients="numpy",
*,
cache=UNSET,
remember_me=True,
username=None,
auth_provider=None,
api_key=None,
verify=True,
prompt_for_reauthentication=None,
headers=None,
timeout=None,
include_data_sources=False,
):
"""
Connect to a Node on a local or remote server.
Parameters
----------
uri : str
e.g. "http://localhost:8000"
structure_clients : str or dict, optional
Use "dask" for delayed data loading and "numpy" for immediate
in-memory structures (e.g. normal numpy arrays, pandas
DataFrames). For advanced use, provide dict mapping a
structure_family or a spec to a client object.
cache : Cache, optional
remember_me : bool
Next time, try to automatically authenticate using this session.
username : str, optional
DEPRECATED. Ignored, and issues a warning if passed.
auth_provider : str, optional
DEPRECATED. Ignored, and issues a warning if passed.
api_key : str, optional
API key based authentication.
verify : bool, optional
Verify SSL certifications. True by default. False is insecure,
intended for development and testing only.
prompt_for_reauthentication : bool, optional
DEPRECATED. Ignored, and issue a warning if passed.
headers : dict, optional
Extra HTTP headers.
timeout : httpx.Timeout, optional
If None, use Tiled default settings.
(To disable timeouts, use httpx.Timeout(None)).
include_data_sources : bool, optional
Default False. If True, fetch information about underlying data sources.
"""
EXPLAIN_LOGIN = """
The user will be prompted for credentials if login is required.
Or, call login() to manually login.
For non-interactive authentication, use an API key.
"""
if username is not None:
warnings.warn("Tiled no longer accepts 'username' parameter. " + EXPLAIN_LOGIN)
if auth_provider is not None:
warnings.warn(
"Tiled no longer accepts 'auth_provider' parameter. " + EXPLAIN_LOGIN
)
if auth_provider is not None:
warnings.warn(
"Tiled no longer accepts 'prompt_for_reauthentication' parameter. "
+ EXPLAIN_LOGIN
)
context, node_path_parts = Context.from_any_uri(
uri,
api_key=api_key,
cache=cache,
headers=headers,
timeout=timeout,
verify=verify,
)
return from_context(
context,
structure_clients=structure_clients,
node_path_parts=node_path_parts,
include_data_sources=include_data_sources,
remember_me=remember_me,
)
def from_context(
context: Context,
structure_clients="numpy",
node_path_parts=None,
include_data_sources=False,
remember_me=True,
):
"""
Advanced: Connect to a Node using a custom instance of httpx.Client or httpx.AsyncClient.
Parameters
----------
context : tiled.client.context.Context
structure_clients : str or dict, optional
Use "dask" for delayed data loading and "numpy" for immediate
in-memory structures (e.g. normal numpy arrays, pandas
DataFrames). For advanced use, provide dict mapping a
structure_family or a spec to a client object.
"""
node_path_parts = node_path_parts or []
# Do entrypoint discovery if it hasn't yet been done.
if Container.STRUCTURE_CLIENTS_FROM_ENTRYPOINTS is None:
Container.discover_clients_from_entrypoints()
# Interpret structure_clients="numpy" and structure_clients="dask" shortcuts.
if isinstance(structure_clients, str):
structure_clients = DEFAULT_STRUCTURE_CLIENT_DISPATCH[structure_clients]
# To construct a user-facing client object, we may be required to authenticate.
# 1. If any API key set, we are already authenticated and there is nothing to do.
# 2. If there are cached valid credentials for this server, use them.
# 3. If not, and the server requires authentication, prompt for authentication.
if context.api_key is None:
auth_is_required = context.server_info.authentication.required
has_providers = len(context.server_info.authentication.providers) > 0
if auth_is_required and not has_providers:
raise RuntimeError(
"""This server requires API key authentication.
Set an api_key as in:
>>> c = from_uri("...", api_key="...")
"""
)
if has_providers:
found_valid_tokens = remember_me and context.use_cached_tokens()
if (not found_valid_tokens) and auth_is_required:
context.authenticate(remember_me=remember_me)
# Context ensures that context.api_uri has a trailing slash.
item_uri = f"{context.api_uri}metadata/{'/'.join(node_path_parts)}"
params = parse_qs(urlparse(item_uri).query)
if include_data_sources:
params["include_data_sources"] = include_data_sources
for attempt in retry_context():
with attempt:
content = handle_error(
context.http_client.get(
item_uri,
headers={"Accept": MSGPACK_MIME_TYPE},
)
).json()
item = content["data"]
return client_for_item(
context, structure_clients, item, include_data_sources=include_data_sources
)
[docs]
def from_profile(name, structure_clients=None, **kwargs):
"""
Build a Node based a 'profile' (a named configuration).
List available profiles and the source filepaths from Python like:
>>> from tiled.profiles import list_profiles
>>> list_profiles()
or from a CLI like:
$ tiled profile list
Or show the file contents like:
>>> from tiled.profiles import load_profiles
>>> load_profiles()
or from a CLI like:
$ tiled profile show PROFILE_NAME
Any additional parameters override profile content. See from_uri for details.
"""
# We accept structure_clients as a separate parameter so that it
# may be invoked positionally, as in from_profile("...", "dask").
from ..profiles import ProfileNotFound, load_profiles, paths
profiles = load_profiles()
try:
filepath, profile_content = profiles[name]
except KeyError as err:
raise ProfileNotFound(
f"Profile {name!r} not found. Found profiles {list(profiles)} "
f"from directories {paths}."
) from err
merged = {**profile_content, **kwargs}
if structure_clients is not None:
merged["structure_clients"] = structure_clients
cache_config = merged.pop("cache", None)
if cache_config is not None:
from tiled.client.cache import Cache
if isinstance(cache_config, collections.abc.Mapping):
# All necessary validation has already been performed
# in load_profiles().
cache = Cache(**cache_config)
else:
# Interpret this as a Cache object passed in directly.
cache = cache_config
merged["cache"] = cache
timeout_config = merged.pop("timeout", None)
if timeout_config is not None:
if isinstance(timeout_config, httpx.Timeout):
timeout = timeout_config
else:
timeout_params = DEFAULT_TIMEOUT_PARAMS.copy()
timeout_params.update(timeout_config)
timeout = httpx.Timeout(**timeout_params)
merged["timeout"] = timeout
# Below, we may convert importable strings like
# "package.module:obj" to live objects. Include the profile's
# source directory in the import path, temporarily.
with prepend_to_sys_path(filepath.parent):
structure_clients_ = merged.pop("structure_clients", None)
if structure_clients_ is not None:
if isinstance(structure_clients_, str):
# Nothing to do.
merged["structure_clients"] = structure_clients_
else:
# This is a dict mapping structure families like "array" and "dataframe"
# to values. The values may be client objects or importable strings.
result = {}
for key, value in structure_clients_.items():
if isinstance(value, str):
class_ = import_object(value, accept_live_object=True)
else:
class_ = value
result[key] = class_
merged["structure_clients"] = result
if "direct" in merged:
# The profile specifies the server in-line.
# Create an app and use it directly via ASGI.
import jsonschema
from ..config import ConfigError, schema
from ..server.app import build_app_from_config
config = merged.pop("direct", None)
try:
jsonschema.validate(instance=config, schema=schema())
except jsonschema.ValidationError as err:
msg = err.args[0]
raise ConfigError(
f"ValidationError while parsing configuration file {filepath}: {msg}"
) from err
context = Context.from_app(
build_app_from_config(config, source_filepath=filepath),
)
return from_context(context, **merged)
else:
return from_uri(**merged)