import contextlib
import getpass
import os
import re
import sys
import time
import urllib.parse
import warnings
from pathlib import Path
import httpx
import msgpack
from .._version import get_versions
from ..utils import DictView, Sentinel
from .auth import (
DEFAULT_TOKEN_CACHE,
CannotRefreshAuthentication,
TiledAuth,
build_refresh_request,
)
from .cache import Revalidate
from .utils import (
DEFAULT_ACCEPTED_ENCODINGS,
DEFAULT_TIMEOUT_PARAMS,
EVENT_HOOKS,
NotAvailableOffline,
handle_error,
)
tiled_version = get_versions()["version"]
USER_AGENT = f"python-tiled/{tiled_version}"
API_KEY_AUTH_HEADER_PATTERN = re.compile(r"^Apikey (\w+)$")
UNSET = Sentinel("UNSET")
PROMPT_FOR_REAUTHENTICATION = None
[docs]class Context:
"""
Wrap an httpx.Client with an optional cache and authentication functionality.
"""
[docs] def __init__(
self,
uri,
*,
headers=None,
api_key=None,
cache=None,
offline=False,
timeout=None,
verify=True,
token_cache=None,
app=None,
raise_server_exceptions=True,
):
# The uri is expected to reach the root API route.
uri = httpx.URL(uri)
headers = headers or {}
headers.setdefault("accept-encoding", ",".join(DEFAULT_ACCEPTED_ENCODINGS))
# Set the User Agent to help the server fail informatively if the client
# version is too old.
headers.setdefault("user-agent", USER_AGENT)
if token_cache is None:
token_cache = DEFAULT_TOKEN_CACHE
# If ?api_key=... is present, move it from the query into a header.
# The server would accept it in the query parameter, but using
# a header is a little more secure (e.g. not logged).
parsed_params = urllib.parse.parse_qs(uri.query.decode())
api_key_list = parsed_params.pop("api_key", None)
if api_key_list is not None:
if api_key is not None:
raise ValueError(
"api_key was provided as query parameter in URI and as keyword argument. Pick one."
)
if len(api_key_list) != 1:
raise ValueError("Cannot handle two api_key query parameters")
(api_key,) = api_key_list
if api_key is None:
# Check for an API key from the environment.
api_key = os.getenv("TILED_API_KEY")
# We will set the API key via the `api_key` property below,
# after constructing the Client object.
# FastAPI redirects /api -> /api/ so add it here to save a request.
path = uri.path
if not path.endswith("/"):
path = f"{path}/"
# Construct the uri *without* api_key param.
# Drop any params/fragments.
self.api_uri = httpx.URL(
urllib.parse.urlunsplit((uri.scheme, uri.netloc.decode(), path, {}, ""))
)
if timeout is None:
timeout = httpx.Timeout(**DEFAULT_TIMEOUT_PARAMS)
if app is None:
client = httpx.Client(
verify=verify,
event_hooks=EVENT_HOOKS,
timeout=timeout,
headers=headers,
follow_redirects=True,
)
else:
from ._testclient import TestClient
# verify parameter is dropped, as there is no SSL in ASGI mode
client = TestClient(
app=app,
event_hooks=EVENT_HOOKS,
timeout=timeout,
headers=headers,
raise_server_exceptions=raise_server_exceptions,
)
client.follow_redirects = True
client.__enter__()
# The TestClient is meant to be used only as a context manager,
# where the context starts and stops and the wrapped ASGI app.
# We are abusing it slightly to enable interactive use of the
# TestClient.
#
# Given a blank slate, we may not have chosen to support this at
# all; we may insist on using this only within a context manager.
# But for now it is necessary to suppot a smooth transition for
# databroker. We may revisit it later.
if sys.version_info < (3, 9):
import atexit
atexit.register(client.__exit__)
else:
import threading
# The threading module has its own (internal) atexit
# mechanism that runs at thread shutdown, prior to the atexit
# mechanism that runs at interpreter shutdown.
# We need to intervene at that layer to close the portal, or else
# we will wait forever for a thread run by the portal to join().
threading._register_atexit(client.__exit__)
self.http_client = client
self._verify = verify
self._cache = cache
self._revalidate = Revalidate.IF_WE_MUST
self._offline = offline
self._token_cache = Path(token_cache)
# Make an initial "safe" request to:
# (1) Get the server_info.
# (2) Let the server set the CSRF cookie.
# No authentication has been set up yet, so these requests will be unauthenticated.
# https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
if offline:
self.server_info = self.get_json(self.api_uri)
else:
# We need a CSRF token.
with self.disable_cache(allow_read=False, allow_write=True):
self.server_info = self.get_json(self.api_uri)
self.api_key = api_key # property setter sets Authorization header
self.admin = Admin(self) # accessor for admin-related requests
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def close(self):
self.http_client.__exit__()
def __getstate__(self):
if getattr(self.http_client, "app", None):
raise TypeError(
"Cannot pickle a Tiled Context built around an ASGI. "
"Only Tiled Context connected to remote servers can be pickled."
)
return (
tiled_version,
self.api_uri,
self.http_client.headers,
list(self.http_client.cookies.jar),
self.http_client.timeout,
self.http_client.auth,
self._verify,
self.offline,
self._revalidate,
self._token_cache,
self.server_info,
self.cache,
)
def __setstate__(self, state):
(
state_tiled_version,
api_uri,
headers,
cookies_list,
timeout,
auth,
verify,
offline,
revalidate,
token_cache,
server_info,
cache,
) = state
if state_tiled_version != tiled_version:
raise TypeError(
f"Cannot unpickle {type(self).__name__} from tiled version {state_tiled_version} "
f"using tiled version {tiled_version}. Pickle should only be used to short-term "
"transfer between identical versions of tiled."
)
self.api_uri = api_uri
cookies = httpx.Cookies()
for cookie in cookies_list:
cookies.set(
cookie.name, cookie.value, domain=cookie.domain, path=cookie.path
)
self.http_client = httpx.Client(
verify=verify,
event_hooks=EVENT_HOOKS,
cookies=cookies,
timeout=timeout,
headers=headers,
follow_redirects=True,
auth=auth,
)
self._revalidate = revalidate
self._token_cache = token_cache
self._cache = cache
self.server_info = server_info
self.offline = offline
[docs] @classmethod
def from_any_uri(
cls,
uri,
*,
headers=None,
api_key=None,
cache=None,
offline=False,
timeout=None,
verify=True,
token_cache=None,
app=None,
):
"""
Accept a URI to a specific node.
For example, given URI "https://example.com/api/v1/node/metadata/a/b/c"
return a Context connected to "https://examples/api/v1" and the list
["a", "b", "c"].
"""
uri = httpx.URL(uri)
node_path_parts = []
if "/node/metadata" in uri.path:
api_path, _, node_path = uri.path.partition("/node/metadata")
api_uri = uri.copy_with(path=api_path)
node_path_parts.extend(
[segment for segment in node_path.split("/") if segment]
)
# Below we handle the case where we are given *less* than the full URI
# to the root endpoint. Here we are taking some care to plan for the case
# where tiled is served at a sub-path, even though that is not yet supported
# on the server side.
elif "/api" not in uri.path:
# Looks like we were given the root path (to the HTML landing page).
path = uri.path
if path.endswith("/"):
path = path[:-1]
api_uri = uri.copy_with(path=f"{path}/api/v1")
elif "/v1" not in uri.path:
# Looks like we were given the /api but no version.
path = uri.path
if path.endswith("/"):
path = path[:-1]
api_uri = uri.copy_with(path=f"{path}/v1")
else:
api_uri = uri
context = cls(
api_uri,
headers=headers,
api_key=api_key,
cache=cache,
offline=offline,
timeout=timeout,
verify=verify,
token_cache=token_cache,
app=app,
)
return context, node_path_parts
[docs] @classmethod
def from_app(
cls,
app,
*,
cache=None,
offline=False,
token_cache=None,
headers=None,
timeout=None,
api_key=UNSET,
raise_server_exceptions=True,
):
"""
Construct a Context around a FastAPI app. Primarily for testing.
"""
context = cls(
uri="http://local-tiled-app/api/v1",
headers=headers,
api_key=api_key,
cache=cache,
offline=offline,
timeout=timeout,
token_cache=token_cache,
app=app,
raise_server_exceptions=raise_server_exceptions,
)
if (api_key is UNSET) and (
not context.server_info["authentication"]["providers"]
):
# Extract the API key from the app and set it.
from ..server.settings import get_settings
settings = app.dependency_overrides[get_settings]()
api_key = settings.single_user_api_key or None
context.api_key = api_key
return context
@property
def tokens(self):
"A view of the current access and refresh tokens."
return DictView(self.http_client.auth.tokens)
@property
def api_key(self):
# Extract from header to ensure that there is one "ground truth" here
# and no possibility of state getting out of sync.
header = self.http_client.headers.get("Authorization", "")
match = API_KEY_AUTH_HEADER_PATTERN.match(header)
if match is not None:
return match.group(1)
@api_key.setter
def api_key(self, api_key):
if api_key is None:
if self.http_client.headers.get("Authorization", "").startswith("Apikey "):
self.http_client.headers.pop("Authorization")
else:
self.http_client.headers["Authorization"] = f"Apikey {api_key}"
@property
def cache(self):
return self._cache
@property
def offline(self):
return self._offline
@offline.setter
def offline(self, value):
offline = bool(value)
if offline and (self._cache is None):
raise RuntimeError(
"""To use offline mode, Tiled must be configured with a Cache, as in
>>> from tiled.client import from_uri
>>> from tiled.client.cache import Cache
>>> client = from_uri("...", cache=Cache.on_disk("my_cache"))
"""
)
self._offline = offline
if not self._offline:
# We need a CSRF token.
with self.disable_cache(allow_read=False, allow_write=True):
self.server_info = self.get_json(self.api_uri)
def which_api_key(self):
"""
A 'who am I' for API keys
"""
if not self.api_key:
raise RuntimeError("Not API key is configured for the client.")
return self.get_json(self.server_info["authentication"]["links"]["apikey"])
def create_api_key(self, scopes=None, expires_in=None, note=None):
"""
Generate a new API for the currently-authenticated user.
Parameters
----------
scopes : Optional[List[str]]
Restrict the access available to the API key by listing specific scopes.
By default, this will have the same access as the user.
expires_in : Optional[int]
Number of seconds until API key expires. If None,
it will never expire or it will have the maximum lifetime
allowed by the server.
note : Optional[str]
Description (for humans).
"""
return self.post_json(
self.server_info["authentication"]["links"]["apikey"],
{"scopes": scopes, "expires_in": expires_in, "note": note},
)
def revoke_api_key(self, first_eight):
request = self.http_client.build_request(
"DELETE",
self.server_info["authentication"]["links"]["apikey"],
headers={"x-csrf": self.http_client.cookies["tiled_csrf"]},
params={"first_eight": first_eight},
)
response = self.http_client.send(request)
handle_error(response)
@property
def app(self):
warnings.warn(
"The 'app' accessor on Context is deprecated. Use Context.http_client.app.",
DeprecationWarning,
stacklevel=2,
)
return self.http_client.app
@property
def base_url(self):
warnings.warn(
"The 'base_url' accessor on Context is deprecated. Use Context.http_client.base_url.",
DeprecationWarning,
stacklevel=2,
)
return self.http_client.base_url
@property
def event_hooks(self):
"httpx.Client event hooks. This is exposed for testing."
warnings.warn(
"The 'event_hooks' accessor on Context is deprecated. Use Context.http_client.event_hooks.",
DeprecationWarning,
stacklevel=2,
)
return self.http_client.event_hooks
@property
def revalidate(self):
"""
This controls how aggressively to check whether cache entries are out of date.
- FORCE: Always revalidate (generally too aggressive and expensive)
- IF_EXPIRED: Revalidate if the "Expire" date provided by the server has passed
- IF_WE_MUST: Only revalidate if the server indicated that is is a
particularly volatile entry, such as a search result to a dynamic query.
"""
return self._revalidate
@revalidate.setter
def revalidate(self, value):
self._revalidate = Revalidate(value)
@contextlib.contextmanager
def revalidation(self, revalidate):
"""
Temporarily change the 'revalidate' property in a context.
Parameters
----------
revalidate: string or tiled.client.cache.Revalidate enum member
"""
try:
member = Revalidate(revalidate)
except ValueError as err:
# This raises a more helpful error that lists the valid options.
raise ValueError(
f"Revalidation {revalidate} not recognized. Must be one of {set(Revalidate.__members__)}"
) from err
original = self.revalidate
self.revalidate = member
yield
# Upon leaving context, set it back.
self.revalidate = original
@contextlib.contextmanager
def disable_cache(self, allow_read=False, allow_write=False):
self._disable_cache_read = not allow_read
self._disable_cache_write = not allow_write
yield
self._disable_cache_read = False
self._disable_cache_write = False
def get_content(self, path, accept=None, stream=False, revalidate=None, **kwargs):
send_kwargs = {}
if "auth" in kwargs:
send_kwargs["auth"] = kwargs.pop("auth")
if revalidate is None:
# Fallback to context default.
revalidate = self.revalidate
request = self.http_client.build_request("GET", path, **kwargs)
if accept:
request.headers["Accept"] = accept
url = request.url
if self._offline:
# We must rely on the cache alone.
# The role of a 'reservation' is to ensure that the content
# of interest is not evicted from the cache between the moment
# that we start verifying its validity and the moment that
# we actually read the content. It is used more extensively
# below.
reservation = self._cache.get_reservation(url)
if reservation is None:
raise NotAvailableOffline(url)
content = reservation.load_content()
if content is None:
# TODO Do we ever get here?
raise NotAvailableOffline(url)
return content
if self._cache is None:
# No cache, so we can use the client straightforwardly.
response = self.http_client.send(request, stream=stream)
handle_error(response)
if response.headers.get("content-encoding") == "blosc":
import blosc
return blosc.decompress(response.content)
return response.content
# If we get this far, we have an online client and a cache.
# Parse Cache-Control header directives.
cache_control = {
directive.lstrip(" ")
for directive in request.headers.get("Cache-Control", "").split(",")
}
if "no-cache" in cache_control:
reservation = None
else:
reservation = self._cache.get_reservation(url)
try:
if reservation is not None:
is_stale = reservation.is_stale()
if not (
# This condition means "client user wants us to unconditionally revalidate"
(revalidate == Revalidate.FORCE)
or
# This condition means "client user wants us to revalidate if expired"
(is_stale and (revalidate == Revalidate.IF_EXPIRED))
or
# This condition means "server really wants us to revalidate"
(is_stale and reservation.item.must_revalidate)
or self._disable_cache_read
):
# Short-circuit. Do not even bother consulting the server.
return reservation.load_content()
if not self._disable_cache_read:
request.headers["If-None-Match"] = reservation.item.etag
response = self.http_client.send(request, stream=stream, **send_kwargs)
handle_error(response)
if response.status_code == 304: # HTTP 304 Not Modified
# Update the expiration time.
reservation.renew(response.headers.get("expires"))
# Read from the cache
return reservation.load_content()
elif not response.is_error:
etag = response.headers.get("ETag")
encoding = response.headers.get("Content-Encoding")
content = response.content
# httpx handles standard HTTP encodings transparently, but we have to
# handle "blosc" manually.
if encoding == "blosc":
import blosc
content = blosc.decompress(content)
if (
("no-store" not in cache_control)
and (etag is not None)
and (not self._disable_cache_write)
):
# Write to cache.
self._cache.put(
url,
response.headers,
content,
)
return content
else:
raise NotImplementedError(
f"Unexpected status_code {response.status_code}"
)
finally:
if reservation is not None:
reservation.ensure_released()
def get_json(self, path, stream=False, **kwargs):
return msgpack.unpackb(
self.get_content(
path, accept="application/x-msgpack", stream=stream, **kwargs
),
timestamp=3, # Decode msgpack Timestamp as datetime.datetime object.
)
def post_json(self, path, content):
request = self.http_client.build_request(
"POST",
path,
json=content,
# Submit CSRF token in both header and cookie.
# https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
headers={
"x-csrf": self.http_client.cookies["tiled_csrf"],
"accept": "application/x-msgpack",
},
)
response = self.http_client.send(request)
handle_error(response)
return msgpack.unpackb(
response.content,
timestamp=3, # Decode msgpack Timestamp as datetime.datetime object.
)
def put_json(self, path, content):
request = self.http_client.build_request(
"PUT",
path,
json=content,
# Submit CSRF token in both header and cookie.
# https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
headers={
"x-csrf": self.http_client.cookies["tiled_csrf"],
"accept": "application/x-msgpack",
},
)
response = self.http_client.send(request)
handle_error(response)
return msgpack.unpackb(
response.content,
timestamp=3, # Decode msgpack Timestamp as datetime.datetime object.
)
def put_content(self, path, content, headers=None, params=None):
# Submit CSRF token in both header and cookie.
# https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
headers = headers or {}
headers.setdefault("x-csrf", self.http_client.cookies["tiled_csrf"])
headers.setdefault("accept", "application/x-msgpack")
request = self.http_client.build_request(
"PUT",
path,
content=content,
headers=headers,
params=params,
)
response = self.http_client.send(request)
handle_error(response)
return msgpack.unpackb(
response.content,
timestamp=3, # Decode msgpack Timestamp as datetime.datetime object.
)
def delete_content(self, path, content, headers=None, params=None):
# Submit CSRF token in both header and cookie.
# https://cheatsheetseries.owasp.org/cheatsheets/Cross-Site_Request_Forgery_Prevention_Cheat_Sheet.html#double-submit-cookie
headers = headers or {}
headers.setdefault("x-csrf", self.http_client.cookies["tiled_csrf"])
headers.setdefault("accept", "application/x-msgpack")
request = self.http_client.build_request(
"DELETE", path, content=None, headers=headers, params=params
)
response = self.http_client.send(request)
handle_error(response)
return msgpack.unpackb(
response.content,
timestamp=3, # Decode msgpack Timestamp as datetime.datetime object.
)
[docs] def authenticate(
self, username=None, provider=None, prompt_for_reauthentication=UNSET
):
"""
See login. This is for programmatic use.
"""
if prompt_for_reauthentication is UNSET:
prompt_for_reauthentication = PROMPT_FOR_REAUTHENTICATION
if prompt_for_reauthentication is None:
prompt_for_reauthentication = (
not sys.__stdin__.closed
) and sys.__stdin__.isatty()
providers = self.server_info["authentication"]["providers"]
spec = _choose_identity_provider(providers, provider)
provider = spec["provider"]
if self.api_key is not None:
# Check that API key authenticates us as this user,
# and then either return or raise.
identities = self.whoami()["identities"]
for identity in identities:
if (identity["provider"] == provider) and (identity["id"] == username):
return
raise RuntimeError(
"An API key is set, and it is not associated with the username/provider "
f"{username}/{provider}. Unset the API key first."
)
refresh_url = self.server_info["authentication"]["links"]["refresh_session"]
csrf_token = self.http_client.cookies["tiled_csrf"]
# If we are passed a username, we can check whether we already have
# tokens stashed.
if username is not None:
token_directory = self._token_directory(provider, username)
self.http_client.auth = TiledAuth(refresh_url, csrf_token, token_directory)
# This will either:
# * Use an access_token and succeed.
# * Use a refresh_token to attempt refresh flow and succeed.
# * Use a refresh_token to attempt refresh flow and fail, raise.
# * Find no tokens and raise.
try:
self.whoami()
except CannotRefreshAuthentication:
# Continue below, where we will prompt for log in.
self.http_client.auth = None
if not prompt_for_reauthentication:
raise
else:
# We have a live session for the specified provider and username already.
# No need to log in again.
return
if not prompt_for_reauthentication:
raise CannotPrompt(
"Authentication is needed but the client cannot prompt for it."
)
self.http_client.auth = None
mode = spec["mode"]
auth_endpoint = spec["links"]["auth_endpoint"]
if mode == "password":
if username:
print(f"Username {username}")
else:
username = input("Username: ")
password = getpass.getpass()
form_data = {
"grant_type": "password",
"username": username,
"password": password,
}
token_response = self.http_client.post(
auth_endpoint, data=form_data, auth=None
)
handle_error(token_response)
tokens = token_response.json()
elif mode == "external":
verification_response = self.http_client.post(
auth_endpoint, json={}, auth=None
)
handle_error(verification_response)
verification = verification_response.json()
authorization_uri = verification["authorization_uri"]
print(
f"""
You have {int(verification['expires_in']) // 60} minutes visit this URL
{authorization_uri}
and enter the code: {verification['user_code']}
"""
)
import webbrowser
webbrowser.open(authorization_uri)
deadline = verification["expires_in"] + time.monotonic()
print("Waiting...", end="", flush=True)
while True:
time.sleep(verification["interval"])
if time.monotonic() > deadline:
raise Exception("Deadline expired.")
access_response = self.http_client.post(
verification["verification_uri"],
json={
"device_code": verification["device_code"],
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
},
auth=None,
)
if (access_response.status_code == 400) and (
access_response.json()["detail"]["error"] == "authorization_pending"
):
print(".", end="", flush=True)
continue
handle_error(access_response)
print("")
break
tokens = access_response.json()
else:
raise ValueError(f"Server has unknown authentication mechanism {mode!r}")
username = tokens["identity"]["id"]
token_directory = self._token_directory(provider, username)
auth = TiledAuth(refresh_url, csrf_token, token_directory)
auth.sync_set_token("access_token", tokens["access_token"])
auth.sync_set_token("refresh_token", tokens["refresh_token"])
self.http_client.auth = auth
confirmation_message = spec.get("confirmation_message")
if confirmation_message:
print(confirmation_message.format(id=username))
return spec, username
[docs] def login(self, username=None, provider=None):
"""
Depending on the server's authentication method, this will prompt for username/password:
>>> c.login()
Username: USERNAME
Password: <input is hidden>
or prompt you to open a link in a web browser to login with a third party:
>>> c.login()
You have ... minutes visit this URL
https://...
and enter the code: XXXX-XXXX
"""
self.authenticate(username, provider)
# For programmatic access to the return values, use authenticate().
# This returns None in order to provide a clean UX in an interpreter.
return None
def _token_directory(self, provider, username):
# ~/.config/tiled/tokens/{host:port}/{provider}/{username}
# with each templated element URL-encoded so it is a valid filename.
return Path(
self._token_cache,
urllib.parse.quote_plus(str(self.api_uri)),
urllib.parse.quote_plus(provider),
urllib.parse.quote_plus(username),
)
[docs] def force_auth_refresh(self):
"""
Execute refresh flow.
This method is exposed for testing and debugging uses.
It should never be necessary for the user to call. Refresh flow is
automatically executed by tiled.client.auth.TiledAuth when the current
access_token expires.
"""
if self.http_client.auth is None:
raise RuntimeError(
"No authentication has been set up. Cannot reauthenticate."
)
refresh_token = self.http_client.auth.sync_get_token(
"refresh_token", reload_from_disk=True
)
if refresh_token is None:
raise CannotRefreshAuthentication("There is no refresh_token.")
csrf_token = self.http_client.cookies["tiled_csrf"]
refresh_request = build_refresh_request(
self.http_client.auth.refresh_url,
refresh_token,
csrf_token,
)
token_response = self.http_client.send(refresh_request, auth=None)
if token_response.status_code == 401:
raise CannotRefreshAuthentication(
"Session cannot be refreshed. Log in again."
)
handle_error(token_response)
tokens = token_response.json()
self.http_client.auth.sync_set_token("access_token", tokens["access_token"])
self.http_client.auth.sync_set_token("refresh_token", tokens["refresh_token"])
return tokens
def whoami(self):
"Return information about the currently-authenticated user or service."
return self.get_json(self.server_info["authentication"]["links"]["whoami"])
[docs] def logout(self):
"""
Log out of the current session (if any).
This method is idempotent.
"""
if self.http_client.auth is None:
return
# Revoke the current session.
self.http_client.post(f"{self.api_uri}auth/logout")
# Clear on-disk state.
self.http_client.auth.sync_clear_token("access_token")
self.http_client.auth.sync_clear_token("refresh_token")
# Clear in-memory state.
self.http_client.headers.pop("Authorization", None)
self.http_client.auth = None
def revoke_session(self, session_id):
"""
Revoke a Session so it cannot be refreshed.
This may be done to ensure that a possibly-leaked refresh token cannot be used.
"""
request = self.http_client.build_request(
"DELETE",
self.server_info["authentication"]["links"]["revoke_session"].format(
session_id=session_id
),
headers={"x-csrf": self.http_client.cookies["tiled_csrf"]},
)
response = self.http_client.send(request)
handle_error(response)
def _choose_identity_provider(providers, provider=None):
if provider is not None:
for spec in providers:
if spec["provider"] == provider:
break
else:
raise ValueError(
f"No such provider {provider}. Choices are {[spec['provider'] for spec in providers]}"
)
else:
if len(providers) == 1:
# There is only one choice, so no need to prompt the user.
(spec,) = providers
else:
while True:
print("Authenticaiton providers:")
for i, spec in enumerate(providers, start=1):
print(f"{i} - {spec['provider']}")
raw_choice = input(
"Choose an authentication provider (or press Enter to escape): "
)
if not raw_choice:
print("No authentication provider chosen. Failed.")
break
try:
choice = int(raw_choice)
except TypeError:
print("Choice must be a number.")
continue
try:
spec = providers[choice - 1]
except IndexError:
print(f"Choice must be a number 1 through {len(providers)}.")
continue
break
return spec
class Admin:
"Accessor for requests that require administrative privileges."
def __init__(self, context):
self.context = context
self.base_url = context.server_info["links"]["self"]
def list_principals(self, offset=0, limit=100):
"List Principals (users and services) in the authenticaiton database."
params = dict(offset=offset, limit=limit)
response = self.context.http_client.get(
f"{self.base_url}/auth/principal", params=params
)
handle_error(response)
return response.json()
def show_principal(self, uuid):
"Show one Principal (user or service) in the authenticaiton database."
response = self.context.http_client.get(
f"{self.base_url}/auth/principal/{uuid}"
)
handle_error(response)
return response.json()
class CannotPrompt(Exception):
pass