import collections
import io
import os
from pathlib import Path
from ._version import get_versions
__version__ = get_versions()['version']
del get_versions
class SuitcaseUtilsError(Exception):
...
class SuitcaseUtilsValueError(SuitcaseUtilsError):
...
class SuitcaseUtilsTypeError(SuitcaseUtilsError):
...
class ModeError(SuitcaseUtilsError):
...
class UnknownEventType(SuitcaseUtilsError):
...
class Artifact:
"""
A class that tracks information about a managed resource.
Parameters
----------
label : string
A label for the sort of content being stored, such as
'stream_data' or 'metadata'.
postfix : string
Postfix for the file name. Must be unique per Manager.
name : string
Name of the file. The full file path or similar unique identifier.
handle: handle
A handle to a file, memory stream or similar.
"""
def __init__(self, label, postfix, name=None, handle=None):
self.label = label
self.postfix = postfix
self.name = name
self.initial_size = None
self._final_size = None
self._handle = None
if handle is not None:
self.handle = handle
def to_dict(self):
"""
Returns current values of properties as a dictionary.
Only ``handle`` is a mutable reference. Other values are snapshots
of current values at the time of calling this method.
"""
return {
'label': self.label,
'postfix': self.postfix,
'name': self.name,
'current_size': self.current_size,
'initial_size': self.initial_size,
'handle': self.handle,
}
@property
def handle(self):
"""
Access the handle to the resource.
A handle is expected to have ``close``, ``seek`` and ``tell`` methods.
When a handle is assigned to this property, those methods are hooked
and used to track the size of the resource.
"""
return self._handle
@handle.setter
def handle(self, val):
# Wraps close() method of a handler to update our size estimate
def update_size_on_close(handle):
orig_close = handle.close
def wrapped_close():
handle.seek(0, os.SEEK_END)
self._final_size = handle.tell()
orig_close()
handle.close = wrapped_close
return handle
self._handle = update_size_on_close(val)
self._final_size = None
self.initial_size = self.current_size
@property
def current_size(self):
"""
Returns the size of the attached handle, or None if unavailable.
"""
if self.handle is None:
return None
if self._final_size is not None:
return self._final_size
orig_pos = self.handle.tell()
self.handle.seek(0, os.SEEK_END)
size = self.handle.tell()
self.handle.seek(orig_pos)
return size
[docs]class MultiFileManager:
"""
A class that manages multiple files.
Parameters
----------
directory : str or Path
The directory (as a string or as a Path) to create teh files inside.
allowed_modes : Iterable
Modes accepted by ``MultiFileManager.open``. By default this is
restricted to "exclusive creation" modes ('x', 'xt', 'xb') which raise
an error if the file already exists. This choice of defaults is meant
to protect the user for unintentionally overwriting old files. In
situations where overwrite ('w', 'wb') or append ('a', 'r+b') are
needed, they can be added here.
This design is inspired by Python's zipfile and tarfile libraries.
"""
def __init__(self, directory, allowed_modes=('x', 'xt', 'xb')):
self._directory = Path(directory)
self._allowed_modes = set(allowed_modes)
self._artifacts = []
@property
def artifacts(self):
"""
Provides dictionary mapping artifact labels to (file)names.
"""
artifacts = collections.defaultdict(list)
for a in self._artifacts:
artifacts[a.label].append(a.name)
return dict(artifacts)
[docs] def get_artifacts(self, label=None):
"""
Returns list of dicts, each populated with artifact properties.
Parameters
----------
label : string
Optional. Filter returned list to include only artifacts that
match the given label value.
"""
return [a.to_dict() for a in self._artifacts
if label is None or a.label == label]
def _get_artifact(self, postfix):
"""
Returns artifact for a given postfix.
"""
for a in self._artifacts:
if a.postfix == postfix:
return a
return None
@property
def estimated_sizes(self):
"""
Provides dictionary mapping artifact postfix to current size.
"""
return {a.postfix: a.current_size for a in self._artifacts}
[docs] def reserve_name(self, label, postfix):
"""
Ask the wrapper for a filepath.
An external library that needs a filepath (not a handle)
may use this instead of the ``open`` method.
Parameters
----------
label : string
A label for the sort of content being stored, such as
'stream_data' or 'metadata'.
postfix : string
Postfix for the file name. Must be unique for this Manager.
Returns
-------
name : Path
"""
if Path(postfix).is_absolute():
raise SuitcaseUtilsValueError(
f"The postfix {postfix!r} must be structured like a relative "
f"file path.")
# Checks for name instead of postfix to remove ambiguity via Path
name = (self._directory / Path(postfix)).expanduser().resolve()
if name in [a.name for a in self._artifacts]:
raise SuitcaseUtilsValueError(
f"The postfix {postfix!r} has already been used.")
self._artifacts.append(Artifact(label, postfix, name))
return name
[docs] def open(self, label, postfix, mode, encoding=None, errors=None):
"""
Request a file handle.
Like the built-in open function, this may be used as a context manager.
Parameters
----------
label : string
A label for the sort of content being stored, such as
'stream_data' or 'metadata'.
postfix : string
Postfix for the file name. Must be unique for this Manager.
mode : string
One of the ``allowed_modes`` set in __init__``. Default set of
options is ``{'x', 'xt', xb'}`` --- 'x' or 'xt' for text, 'xb' for
binary.
encoding : string or None
Passed through open. See Python open documentation for allowed
values. Only applicable to text mode.
errors : string or None
Passed through to open. See Python open documentation for allowed
values.
Returns
-------
file : handle
"""
if mode not in self._allowed_modes:
raise ModeError(
f'The mode passed to MultiFileManager.open is {mode} but '
f'needs to be one of {self._allowed_modes}')
filepath = self.reserve_name(label, postfix)
os.makedirs(os.path.dirname(filepath), exist_ok=True)
f = open(filepath, mode=mode, encoding=encoding, errors=errors)
artifact = self._get_artifact(postfix)
artifact.handle = f
return f
[docs] def close(self):
"""
Close all files opened by the manager.
"""
for a in self._artifacts:
if a.handle is not None:
a.handle.close()
[docs]class PersistentStringIO(io.StringIO):
''' A StringIO that does not clear the buffer when closed.
.. note::
This StringIO subclass behaves like StringIO except that its
close() method, which would normally clear the buffer, has no
effect. The clear() method, however, may still be used.
'''
[docs] def close(self):
# Avoid clearing the buffer before caller of ``export`` can access it.
pass
[docs]class PersistentBytesIO(io.BytesIO):
''' A BytesIO that does not clear the buffer when closed.
.. note::
This BytesIO subclass behaves like BytesIO except that its
close() method, which would normally clear the buffer, has no
effect. The clear() method, however, may still be used.
'''
[docs] def close(self):
# Avoid clearing the buffer before caller of ``export`` can access it.
pass
[docs]class MemoryBuffersManager:
"""
A class that manages multiple StringIO and/or BytesIO instances.
This design is inspired by Python's zipfile and tarfile libraries.
This has a special buffers attribute which can be used to retrieve
buffers created.
"""
def __init__(self):
self._artifacts = []
@property
def artifacts(self):
"""
Provides dictionary mapping artifact labels to (file)names.
"""
artifacts = collections.defaultdict(list)
for a in self._artifacts:
artifacts[a.label].append(a.handle)
return dict(artifacts)
[docs] def get_artifacts(self, label=None):
"""
Returns list of dicts, each populated with artifact properties.
Parameters
----------
label : string
Optional. Filter returned list to include only artifacts that
match the given label value.
"""
return [a.to_dict() for a in self._artifacts
if label is None or a.label == label]
def _get_artifact(self, postfix):
"""
Returns artifact for a given postfix.
"""
for a in self._artifacts:
if a.postfix == postfix:
return a
return None
@property
def estimated_sizes(self):
"""
Provides dictionary mapping artifact postfix to current size.
"""
return {a.postfix: a.current_size for a in self._artifacts}
[docs] def reserve_name(self, label, postfix):
"""
This action is not valid on this manager. It will always raise.
Parameters
----------
label : string
A label for the sort of content being stored, such as
'stream_data' or 'metadata'.
postfix : string
Relative file path. Must be unique for this Manager.
Raises
------
SuitcaseUtilsTypeError
"""
raise SuitcaseUtilsTypeError(
"MemoryBuffersManager is incompatible with exporters that require "
"explicit filenames.")
[docs] def open(self, label, postfix, mode, encoding=None, errors=None):
"""
Request a file handle.
Like the built-in open function, this may be used as a context manager.
Parameters
----------
label : string
A label for the sort of content being stored, such as
'stream_data' or 'metadata'.
postfix : string
Relative file path (simply used as an identifer in this case, as
there is no actual file). Must be unique for this Manager.
mode : {'x', 'xt', xb'}
'x' or 'xt' for text, 'xb' for binary
encoding : string or None
Not used. Accepted for compatibility with built-in open().
errors : string or None
Not used. Accepted for compatibility with built-in open().
Returns
-------
file : handle
"""
# Of course, in-memory buffers have no filepath, but we still expect
# postfix to be a thing that looks like a relative filepath, and we use
# it as a unique identifier for a given buffer.
if Path(postfix).is_absolute():
raise SuitcaseUtilsValueError(
f"The postfix {postfix} must be structured like a relative "
f"file path.")
name = Path(postfix).expanduser().resolve()
if name in [a.name for a in self._artifacts]:
raise SuitcaseUtilsValueError(
f"The postfix {postfix!r} has already been used.")
if mode in ('x', 'xt'):
buffer = PersistentStringIO()
elif mode == 'xb':
buffer = PersistentBytesIO()
else:
raise ModeError(
f"The mode passed to MemoryBuffersManager.open is {mode} but "
f"needs to be one of 'x', 'xt' or 'xb'.")
self._artifacts.append(Artifact(label, postfix, name, buffer))
return buffer
[docs] def close(self):
"""
Close all buffers opened by the manager.
"""
for a in self._artifacts:
if a.handle is not None:
a.handle.close()