import copy
import getpass
import os
import secrets
import time as ttime
from collections.abc import Iterable, Mapping
from pathlib import Path
from ._defaults import default_user_group, default_user_name
from .comm_base import RequestParameterError
from .item import BItem
class WaitTimeoutError(TimeoutError): ...
class WaitCancelError(TimeoutError): ...
[docs]class WaitMonitor:
"""
Creates ``monitor`` object for ``wait_...`` operations, such as ``wait_for_idle``.
The object may be used to stop the operation from another thread or
asynchronous task.
Examples
--------
The examples illustrate how to use ``WaitMonitor`` object to cancel
wait operations. Synchronous code (0MQ or HTTP):
.. code-block:: python
# Synchronous code
from bluesky_queueserver_api import Wait Monitor
from bluesky_queueserver_api.zmq import REManagerAPI() # Same for HTTP
import time
import threading
RM = REManagerAPI()
monitor = WaitMonitor()
def wait_re_manager_idle():
try:
print("Waiting ...")
RM.wait_for_idle(monitor=monitor)
except RM.WaitCancelError:
print("Cancelled.")
except RM.WaitTimeoutError:
print("Timeout.")
print("RE Manager is idle")
# Wait until RE Manager is in 'idle' state in a background thread
thread = threading.Thread(target=wait_re_manager_idle)
thread.start()
# Cancel wait after 2 seconds from main thread
time.sleep(2)
monitor.cancel()
thread.join()
RM.close()
Asynchronous code example (0MQ or HTTP):
.. code-block:: python
# Asynchronous code
import asyncio
from bluesky_queueserver_api import Wait Monitor
from bluesky_queueserver_api.zmq.aio import REManagerAPI() # Same for HTTP
import time
async def testing():
RM = REManagerAPI()
monitor = WaitMonitor()
async def wait_re_manager_idle():
try:
print("Waiting ...")
await RM.wait_for_idle(monitor=monitor)
except RM.WaitCancelError:
print("Cancelled.")
except RM.WaitTimeoutError:
print("Timeout.")
print("RE Manager is idle")
# Wait until RE Manager is in 'idle' state in a background task
asyncio.create_task(wait_re_manager_idle())
# Cancel wait after 2 seconds from main thread
await asyncio.sleep(2)
monitor.cancel()
await asyncio.sleep(0.5) # Let the task to complete
await RM.close()
asyncio.run(testing())
"""
[docs] def __init__(self):
self._time_start = 0
self._timeout = 0
self._cancel_callbacks = []
self._wait_cancelled = False
@property
def time_start(self):
"""
Time when the operation started (seconds).
"""
return self._time_start
@property
def time_elapsed(self):
"""
Time since the operation started (seconds).
"""
return ttime.time() - self.time_start
@property
def timeout(self):
"""
Timeout (seconds).
"""
return self._timeout
[docs] def set_timeout(self, timeout):
"""
Modify timeout for the current operation (seconds).
"""
self._timeout = timeout
[docs] def add_cancel_callback(self, cancel_callback):
"""
Each callbacks is called only once before the operation is cancelled.
Callback function should accept no parameters.
"""
self._cancel_callbacks.append(cancel_callback)
[docs] def cancel(self):
"""
Cancel the currently running operation. A monitor may be cancelled
only once per lifecycle.
"""
for cb in self._cancel_callbacks:
try:
cb()
except Exception:
pass
self._cancel_callbacks = []
self._wait_cancelled = True
@property
def is_cancelled(self):
"""
Checks if the monitor was cancelled. The operation is either completed
or about to be completed.
"""
return self._wait_cancelled
class API_Base:
WaitTimeoutError = WaitTimeoutError
WaitCancelError = WaitCancelError
def __init__(self, *, status_expiration_period, status_polling_period):
self._status_expiration_period = status_expiration_period # seconds
self._status_polling_period = status_polling_period # seconds
self._status_timestamp = None
self._status_current = None
self._status_exception = None
self._user = default_user_name # Meaningful user name should be set in application code.
self._user_group = default_user_group
self._current_plan_queue = []
self._current_running_item = {}
self._current_plan_queue_uid = None
self._current_plan_history = []
self._current_plan_history_uid = None
self._current_plans_allowed = {}
self._current_plans_allowed_uid = None
self._current_devices_allowed = {}
self._current_devices_allowed_uid = None
self._current_plans_existing = {}
self._current_plans_existing_uid = None
self._current_devices_existing = {}
self._current_devices_existing_uid = None
self._current_run_list = []
self._current_run_list_uid = None
self._current_lock_info = {}
self._current_lock_info_uid = None
self._lock_key = None
self._default_lock_key_path = os.path.join(Path.home(), ".config", "qserver", "default_lock_key.txt")
self._enable_locked_api = False
def _check_name(self, name, name_in_msg):
if not isinstance(name, str):
raise ValueError(f"{name_in_msg} {name!r} is not a string: type {type(name)!r}")
if not name:
raise ValueError(f"{name_in_msg} is an empty string.")
@property
def user(self):
"""
Get and set the default user name. The default value is used if user name is not passed
explicitly as an API parameter (for API calls that require user name). User name is ignored
by the HTTP version of API, since HTTP server is expected to manage user names.
"""
return self._user
@user.setter
def user(self, user):
self._check_name(user, "User name")
self._user = user
@property
def user_group(self):
"""
Get and set the default user group name. The default value is used if the group name is
not passed explicitly as an API parameter (for API calls that require user group name).
The group name is ignored by the HTTP version of API, since HTTP server is expected to manage
user information including names of user group.
"""
return self._user_group
@user_group.setter
def user_group(self, user_group):
self._check_name(user_group, "User group name")
self._user_group = user_group
def set_user_name_to_login_name(self):
"""
Set the default user name to 'login name'. Login name the current user of the workstation
is used. User name is ignored by the HTTP version of the API.
"""
self.user = getpass.getuser()
def _clear_status_timestamp(self):
"""
Clearing status timestamp causes status to be reloaded from the server next time it is requested.
"""
self._status_timestamp = None
def _get_user_group_for_allowed_plans_devices(self, *, user_group):
"""
Returns ``user_group`` used by ``plans_allowed`` and ``devices_allowed`` API.
"""
if self._add_request_param:
user_group = user_group if user_group else self._user_group
else:
user_group = "http" # Arbitrary group names, since it is not sent in the request
return user_group
def _request_params_add_user_info(self, request_params, *, user, user_group):
if self._pass_user_info:
request_params["user"] = user if user else self._user
request_params["user_group"] = user_group if user_group else self._user_group
def _add_request_param(self, request_params, name, value):
"""
Add parameter to dictionary ``request_params`` if value is not ``None``.
"""
if value is not None:
request_params[name] = value
def _add_lock_key(self, request_params, lock_key):
"""
Add lock key to ``request_params`` if lock key is not ``None``.
If ``lock_key`` is None, then try to use the 'current' lock key.
If the passed and 'current' lock key is None, then do not add the key.
"""
self._validate_lock_key(lock_key)
if not lock_key and self._enable_locked_api:
lock_key = self._lock_key
if lock_key:
request_params["lock_key"] = lock_key
def _prepare_item_add(self, *, item, pos, before_uid, after_uid, user, user_group, lock_key):
"""
Prepare parameters for ``item_add`` operation.
"""
if not isinstance(item, BItem) and not isinstance(item, Mapping):
raise TypeError(f"Incorrect item type {type(item)!r}. Expected type: 'BItem' or 'dict'")
item = item.to_dict() if isinstance(item, BItem) else dict(item).copy()
request_params = {"item": item}
self._add_request_param(request_params, "pos", pos)
self._add_request_param(request_params, "before_uid", before_uid)
self._add_request_param(request_params, "after_uid", after_uid)
self._request_params_add_user_info(request_params, user=user, user_group=user_group)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_add_batch(self, *, items, pos, before_uid, after_uid, user, user_group, lock_key):
"""
Prepare parameters for ``item_add_batch`` operation.
"""
if not isinstance(items, Iterable):
raise TypeError(f"Parameter ``items`` must be iterable: type(items)={type(items)!r}")
for n, item in enumerate(items):
if not isinstance(item, BItem) and not isinstance(item, Mapping):
raise TypeError(
f"Incorrect type {type(item)!r} if item #{n} ({item!r}). Expected type: 'BItem' or 'dict'"
)
items = [_.to_dict() if isinstance(_, BItem) else dict(_).copy() for _ in items]
request_params = {"items": items}
self._add_request_param(request_params, "pos", pos)
self._add_request_param(request_params, "before_uid", before_uid)
self._add_request_param(request_params, "after_uid", after_uid)
self._request_params_add_user_info(request_params, user=user, user_group=user_group)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_update(self, *, item, replace, user, user_group, lock_key):
"""
Prepare parameters for ``item_update`` operation.
"""
if not isinstance(item, BItem) and not isinstance(item, Mapping):
raise TypeError(f"Incorrect item type {type(item)!r}. Expected type: 'BItem' or 'dict'")
item = item.to_dict() if isinstance(item, BItem) else dict(item).copy()
request_params = {"item": item}
self._add_request_param(request_params, "replace", replace)
self._request_params_add_user_info(request_params, user=user, user_group=user_group)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_move(self, *, pos, uid, pos_dest, before_uid, after_uid, lock_key):
"""
Prepare parameters for ``item_add`` operation.
"""
request_params = {}
self._add_request_param(request_params, "pos", pos)
self._add_request_param(request_params, "uid", uid)
self._add_request_param(request_params, "pos_dest", pos_dest)
self._add_request_param(request_params, "before_uid", before_uid)
self._add_request_param(request_params, "after_uid", after_uid)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_move_batch(self, *, uids, pos_dest, before_uid, after_uid, reorder, lock_key):
"""
Prepare parameters for ``item_add`` operation.
"""
request_params = {}
self._add_request_param(request_params, "uids", uids)
self._add_request_param(request_params, "pos_dest", pos_dest)
self._add_request_param(request_params, "before_uid", before_uid)
self._add_request_param(request_params, "after_uid", after_uid)
self._add_request_param(request_params, "reorder", reorder)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_get(self, *, pos, uid):
"""
Prepare parameters for ``item_get`` and ``item_remove`` operation
"""
request_params = {}
self._add_request_param(request_params, "pos", pos)
self._add_request_param(request_params, "uid", uid)
return request_params
def _prepare_item_remove(self, *, pos, uid, lock_key):
"""
Prepare parameters for ``item_get`` and ``item_remove`` operation
"""
request_params = {}
self._add_request_param(request_params, "pos", pos)
self._add_request_param(request_params, "uid", uid)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_remove_batch(self, *, uids, ignore_missing, lock_key):
"""
Prepare parameters for ``item_remove_batch`` operation
"""
request_params = {"uids": uids}
self._add_request_param(request_params, "ignore_missing", ignore_missing)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_item_execute(self, *, item, user, user_group, lock_key):
"""
Prepare parameters for ``item_execute`` operation.
"""
if not isinstance(item, BItem) and not isinstance(item, Mapping):
raise TypeError(f"Incorrect item type {type(item)!r}. Expected type: 'BItem' or 'dict'")
item = item.to_dict() if isinstance(item, BItem) else dict(item).copy()
request_params = {"item": item}
self._request_params_add_user_info(request_params, user=user, user_group=user_group)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_history_clear(self, *, lock_key):
"""
Prepare parameters for ``history_clear``
"""
request_params = {}
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_queue_clear(self, *, lock_key):
"""
Prepare parameters for ``queue_clear``
"""
request_params = {}
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_queue_autostart(self, *, enable, lock_key):
"""
Prepare parameters for ``queue_autostart``.
"""
request_params = {"enable": bool(enable)}
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_queue_mode_set(self, **kwargs):
"""
Prepare parameters for ``queue_mode_set`` operation.
"""
lock_key = kwargs.pop("lock_key", None)
if "mode" in kwargs:
request_params = {"mode": kwargs["mode"]}
else:
request_params = {"mode": kwargs}
self._add_lock_key(request_params, lock_key)
return request_params
def _process_response_queue_get(self, response):
"""
``queue_get``: process response
"""
if response["success"] is True:
self._current_plan_queue = copy.deepcopy(response["items"])
self._current_running_item = copy.deepcopy(response["running_item"])
self._current_plan_queue_uid = copy.deepcopy(response["plan_queue_uid"])
def _generate_response_queue_get(self):
"""
``queue_get``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"plan_queue_uid": self._current_plan_queue_uid,
"running_item": copy.deepcopy(self._current_running_item),
"items": copy.deepcopy(self._current_plan_queue),
}
return response
def _process_response_history_get(self, response):
"""
``history_get``: process response
"""
if response["success"] is True:
self._current_plan_history = copy.deepcopy(response["items"])
self._current_plan_history_uid = copy.deepcopy(response["plan_history_uid"])
def _generate_response_history_get(self):
"""
``history_get``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"plan_history_uid": self._current_plan_history_uid,
"items": copy.deepcopy(self._current_plan_history),
}
return response
def _prepare_plans_devices_allowed(self, *, user_group):
"""
Prepare parameters for ``plans_allowed`` and ``devices_allowed`` operation.
"""
request_params = {}
self._request_params_add_user_info(request_params, user=None, user_group=user_group)
# User name should not be includedin the request
if "user" in request_params:
del request_params["user"]
return request_params
def _invalidate_plans_allowed_cache(self):
self._current_plans_allowed.clear()
def _process_response_plans_allowed(self, response, *, user_group):
"""
``plans_allowed``: process response
"""
if response["success"] is True:
if response["plans_allowed_uid"] != self._current_plans_allowed_uid:
self._invalidate_plans_allowed_cache()
self._current_plans_allowed_uid = response["plans_allowed_uid"]
self._current_plans_allowed[user_group] = copy.deepcopy(response["plans_allowed"])
def _generate_response_plans_allowed(self, *, user_group):
"""
``plans_allowed``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"plans_allowed_uid": self._current_plans_allowed_uid,
"plans_allowed": copy.deepcopy(self._current_plans_allowed[user_group]),
}
return response
def _invalidate_devices_allowed_cache(self):
self._current_devices_allowed.clear()
def _process_response_devices_allowed(self, response, *, user_group):
"""
``devices_allowed``: process response
"""
if response["success"] is True:
if response["devices_allowed_uid"] != self._current_devices_allowed_uid:
self._invalidate_devices_allowed_cache()
self._current_devices_allowed_uid = response["devices_allowed_uid"]
self._current_devices_allowed[user_group] = copy.deepcopy(response["devices_allowed"])
def _generate_response_devices_allowed(self, *, user_group):
"""
``devices_allowed``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"devices_allowed_uid": self._current_devices_allowed_uid,
"devices_allowed": copy.deepcopy(self._current_devices_allowed[user_group]),
}
return response
def _process_response_plans_existing(self, response):
"""
``plans_existing``: process response
"""
if response["success"] is True:
self._current_plans_existing = copy.deepcopy(response["plans_existing"])
self._current_plans_existing_uid = response["plans_existing_uid"]
def _generate_response_plans_existing(self):
"""
``plans_existing``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"plans_existing_uid": self._current_plans_existing_uid,
"plans_existing": copy.deepcopy(self._current_plans_existing),
}
return response
def _process_response_devices_existing(self, response):
"""
``devices_existing``: process response
"""
if response["success"] is True:
self._current_devices_existing = copy.deepcopy(response["devices_existing"])
self._current_devices_existing_uid = response["devices_existing_uid"]
def _generate_response_devices_existing(self):
"""
``devices_existing``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"devices_existing_uid": self._current_devices_existing_uid,
"devices_existing": copy.deepcopy(self._current_devices_existing),
}
return response
def _prepare_permissions_reload(self, *, restore_plans_devices, restore_permissions, lock_key):
"""
Prepare parameters for ``permissions_reload``
"""
request_params = {}
self._add_request_param(request_params, "restore_plans_devices", restore_plans_devices)
self._add_request_param(request_params, "restore_permissions", restore_permissions)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_permissions_set(self, *, user_group_permissions, lock_key):
"""
Prepare parameters for ``permissions_set``
"""
request_params = {"user_group_permissions": user_group_permissions}
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_environment_control(self, *, lock_key):
"""
Prepare parameters for generic API for environment control which accepts only 'lock_key'``
"""
request_params = {}
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_environment_update(self, *, run_in_background, lock_key):
"""
Prepare parameters for ``environment_update``
"""
request_params = {}
self._add_request_param(request_params, "run_in_background", run_in_background)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_script_upload(self, *, script, update_lists, update_re, run_in_background, lock_key):
"""
Prepare parameters for ``script_upload``
"""
request_params = {"script": script}
self._add_request_param(request_params, "update_lists", update_lists)
self._add_request_param(request_params, "update_re", update_re)
self._add_request_param(request_params, "run_in_background", run_in_background)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_function_execute(self, *, item, run_in_background, user, user_group, lock_key):
"""
Prepare parameters for ``script_upload``
"""
if not isinstance(item, BItem) and not isinstance(item, Mapping):
raise TypeError(f"Incorrect item type {type(item)!r}. Expected type: 'BItem' or 'dict'")
item = item.to_dict() if isinstance(item, BItem) else dict(item).copy()
request_params = {"item": item}
self._add_request_param(request_params, "run_in_background", run_in_background)
self._request_params_add_user_info(request_params, user=user, user_group=user_group)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_task_status(self, *, task_uid):
"""
Prepare parameters for ``task_status``
"""
if isinstance(task_uid, str):
# A string should remain a string.
task_uid_prepared = task_uid
elif isinstance(task_uid, Iterable):
# Status of multiple tasks can be fetched from the manager per a single request.
# Iterable may be a tuple, a set etc, but it is best to convert it to a list.
task_uid_prepared = list(task_uid)
else:
raise RequestParameterError(
f"Invalid type of parameter 'task_uid' ({type(task_uid)}). String or iterable (list) is expected."
)
request_params = {"task_uid": task_uid_prepared}
return request_params
def _prepare_task_result(self, *, task_uid):
"""
Prepare parameters for ``task_result``
"""
if not isinstance(task_uid, str):
# Only a result of a single task can be fetched per request.
raise RequestParameterError(
f"Invalid type of parameter 'task_uid' ({type(task_uid)}). String is expected."
)
request_params = {"task_uid": task_uid}
return request_params
def _verify_options_re_runs(self, option):
"""
Options for ``re_runs`` API is processed locally, so verify that the option
value is supported.
"""
allowed_options = (None, "active", "open", "closed")
if option not in allowed_options:
raise IndexError(f"Unsupported option {option!r}. Supported options: {allowed_options}")
def _process_response_re_runs(self, response, *, option):
"""
``re_runs``: process response
"""
if response["success"] is True:
self._current_run_list = copy.deepcopy(response["run_list"])
self._current_run_list_uid = response["run_list_uid"]
response["run_list"] = self._select_re_runs_items(option=option)
return response
def _select_re_runs_items(self, *, option):
"""
``re_runs``: select runs from the full list based on the option.
"""
if option == "open":
run_list = [_ for _ in self._current_run_list if _["is_open"]]
elif option == "closed":
run_list = [_ for _ in self._current_run_list if not _["is_open"]]
else:
run_list = copy.deepcopy(self._current_run_list)
return run_list
def _generate_response_re_runs(self, *, option):
"""
``re_runs``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"run_list_uid": self._current_run_list_uid,
"run_list": self._select_re_runs_items(option=option),
}
return response
def _prepare_re_pause(self, *, option, lock_key):
"""
Prepare parameters for ``re_pause`` operation
"""
request_params = {}
self._add_request_param(request_params, "option", option)
self._add_lock_key(request_params, lock_key)
return request_params
def _prepare_wait_for_completed_task(self, *, task_uid):
"""
Preprocessing parameters for ``wait_for_completed_task``.
"""
params = self._prepare_task_status(task_uid=task_uid)
task_uid = params["task_uid"]
if not task_uid:
# At this point, 'task_uid' is a string or a list.
msg_type = "string" if isinstance(task_uid, str) else "list"
raise RequestParameterError(
f"Invalid value of parameter 'task_uid': task UID must be a non-empty {msg_type}"
)
return task_uid
def _pick_completed_tasks(self, task_status_reply, *, treat_not_found_as_completed):
"""
Returns a dictionary of completed tasks based on reply retured by ``task_status`` API.
The dictionary maps task UID to the status. Status may be 'completed' or 'not_found'
(if ``treat_not_found_as_completed`` is ``True``).
Parameters
----------
task_status_reply: dict
Dictionary returned by ``REManagerAPI.task_status`` API. It is assumed, that
the API call was successful.
treat_not_found_as_completed: boolean
The tasks with status 'not_found' are treated as 'completed' if ``True``.
Typically 'not_found' means that the task was completed and then expired
and deleted from the list of active tasks, so this assumption is valid in
most cases.
Returns
-------
dict(str: str)
Dictionary that maps task UIDs to its status (``'completed'`` or ``'not_found'``).
The dictionary may be empty if there are no completed tasks.
"""
completed_status_vals = ["completed"]
if treat_not_found_as_completed:
completed_status_vals.append("not_found")
task_uids = task_status_reply["task_uid"]
task_status = task_status_reply["status"]
if (task_uids is None) or (task_status is None):
return {}
elif isinstance(task_uids, str):
return {task_uids: task_status} if task_status in completed_status_vals else []
elif isinstance(task_uids, list):
return {_: task_status[_] for _ in task_uids if task_status[_] in completed_status_vals}
else:
return {}
def _prepare_kernel_interrupt(self, *, interrupt_task, interrupt_plan, lock_key):
"""
Prepare parameters for ``kernel_interrupt``
"""
request_params = {}
self._add_request_param(request_params, "interrupt_task", interrupt_task)
self._add_request_param(request_params, "interrupt_plan", interrupt_plan)
self._add_lock_key(request_params, lock_key)
return request_params
def _validate_lock_key(self, lock_key):
# lock key may be a non-empty string or None
if not (lock_key is None):
if not isinstance(lock_key, str) or not lock_key:
raise ValueError(f"Parameter 'lock_key' must be non-empty string or None: lock_key={lock_key!r}")
def _prepare_lock(self, *, environment, queue, lock_key, note, user):
# Lock key may be None. Use self.lock_key in this case.
self._validate_lock_key(lock_key)
if not lock_key:
lock_key = self.lock_key
if not lock_key:
raise RuntimeError("Failed to format the 'lock' request: Lock key is not set")
if not isinstance(note, (str, type(None))):
raise ValueError(f"Parameter 'note' must be a string or None: note={note!r}")
environment, queue = bool(environment), bool(queue)
request_params = {}
if environment:
request_params["environment"] = environment
if queue:
request_params["queue"] = queue
request_params["lock_key"] = lock_key
self._request_params_add_user_info(request_params, user=user, user_group=None)
if "user_group" in request_params:
del request_params["user_group"]
if note:
request_params["note"] = note
return request_params
def _prepare_unlock(self, *, lock_key):
# Lock key may be None. Use self.lock_key in this case.
self._validate_lock_key(lock_key)
if not lock_key:
lock_key = self.lock_key
if not lock_key:
raise RuntimeError("Failed to format the 'unlock' request: Lock key is not set")
return {"lock_key": lock_key}
def _prepare_lock_info(self, *, lock_key):
# Lock key may be None.
self._validate_lock_key(lock_key)
return {"lock_key": lock_key}
def _process_response_lock_info(self, response):
"""
``lock_info``: process response
"""
if response["success"] is True:
self._current_lock_info = copy.deepcopy(response["lock_info"])
self._current_lock_info_uid = copy.deepcopy(response["lock_info_uid"])
def _generate_response_lock_info(self):
"""
``lock_info``: generate response based on cached data
"""
response = {
"success": True,
"msg": "",
"lock_info_uid": self._current_lock_info_uid,
"lock_info": copy.deepcopy(self._current_lock_info),
}
return response
@property
def default_lock_key_path(self):
"""
Get/set path of the file with the default lock key. The default path is
``<user-home-dir>.config/qserver/default_lock_key.txt``. In some workflows
it may be useful to set a different path.
"""
return self._default_lock_key_path
@default_lock_key_path.setter
def default_lock_key_path(self, lock_key_path):
self._default_lock_key_path = lock_key_path
def get_default_lock_key(self, new_key=False):
"""
Returns the default lock key. The key is stored in a file ``.config/qserver/default_lock_key.txt``.
The key is load from disk each time the method is called. If the key (the file) does not exist
or the method parameter ``new_key`` is ``True``, then the new key is generated and saved to file.
A specific default key may be manually set and saved to file using ``set_default_lock_key()`` API.
The default lock key is used for storage of the key so that it could be reused between
sessions. The default key is not used directly by other API. Typically the initialization
script for the session should initialize the current lock key with the default lock key::
RM.lock_key = RM.get_default_lock_key()
Parameters
----------
new_key: boolean
Set this parameter ``True`` to generate a new default lock key.
Returns
-------
str
The default lock key.
"""
if not os.path.exists(self._default_lock_key_path) or new_key:
lock_key = secrets.token_urlsafe(16)
self.set_default_lock_key(lock_key)
else:
with open(self._default_lock_key_path, "rt") as f:
lock_key = f.readlines()[0].strip()
return lock_key
def set_default_lock_key(self, lock_key):
"""
Set the default lock key. The lock key must be a non-empty string. The key is saved
to a file on disk and loaded each time ``get_default_lock_key()`` method is called.
See the description for ``get_default_lock_key()`` for more information.
Parameters
----------
lock_key: str
The new lock key.
Returns
-------
None
Raises
------
IOError
Error while saving the lock key to disk or the lock key is invalid.
"""
try:
if not lock_key or not isinstance(lock_key, str):
raise ValueError(f"'lock_key' must be a non-empty string: lock_key={lock_key!r}")
lock_key_path, _ = os.path.split(self._default_lock_key_path)
if os.path.exists(lock_key_path):
if not os.path.isdir(lock_key_path):
raise IOError(f"Path {lock_key_path!r} exists, but it is not a directory")
else:
os.makedirs(lock_key_path, exist_ok=True)
with open(self._default_lock_key_path, "wt") as f:
f.write(lock_key)
except Exception as ex:
raise IOError(f"Failed to save the default lock key: {ex}") from ex
@property
def lock_key(self):
"""
Get/set current lock key. The current lock key is used for locking/unlocking RE Manager
unless the key is explicitly passed as ``lock_key`` parameter. The current key is also
passed with other API if locked API are enabled (``enable_locked_api`` is ``True``).
Setting the current lock key to ``None`` clears the lock key and disables access to
locked API. A valid current lock key must be set before access to locked API is enabled.
Raises
------
ValueError
Invalid lock key. The key must be a non-empty string or ``None``.
"""
return self._lock_key
@lock_key.setter
def lock_key(self, lock_key):
self._validate_lock_key(lock_key)
if not lock_key:
self.enable_locked_api = False
self._lock_key = lock_key
@property
def enable_locked_api(self):
"""
Enable/disable access to locked API. When access to locked API is enabled, the current
lock key (``RM.lock_key``) is automatically sent with API requests. The locked API
may be called without enabling automatic access by explicitly sending the lock key
with each request. A valid current lock key must be set before access could be enabled.
The access is disabled when the lock key is cleared.
Raises
------
TypeError
Attempt to set to a value of non-boolean type.
RuntimeError
Current lock key is not set.
"""
return self._enable_locked_api
@enable_locked_api.setter
def enable_locked_api(self, enable_locked_api):
if not isinstance(enable_locked_api, bool):
raise TypeError("The property may be set only to boolean values")
if not self.lock_key:
raise RuntimeError("Failed to enable locked API: current lock key is not set")
self._enable_locked_api = enable_locked_api