Source code for ubelt.util_cache

"""
This module exposes :class:`Cacher` and :class:`CacheStamp` classes, which
provide a simple API for on-disk caching.

The :class:`Cacher` class is the simplest and most direct method of caching. In
fact, it only requires four lines of boilerplate, which is the smallest
general and robust way that I (Jon Crall) have achieved, and I don't think its
possible to do better.  These four lines implement the following necessary and
sufficient steps for general robust on-disk caching.

    1. Defining the cache dependencies
    2. Checking if the cache missed
    3. Loading the cache on a hit
    4. Executing the process and saving the result on a miss.

The following example illustrates these four points.

Example:
    >>> import ubelt as ub
    >>> # Define a cache name and dependencies (which is fed to `ub.hash_data`)
    >>> cacher = ub.Cacher('name', depends='set-of-deps')  # boilerplate:1
    >>> # Calling tryload will return your data on a hit and None on a miss
    >>> data = cacher.tryload(on_error='clear')            # boilerplate:2
    >>> # Check if you need to recompute your data
    >>> if data is None:                                   # boilerplate:3
    >>>     # Your code to recompute data goes here (this is not boilerplate).
    >>>     data = 'mydata'
    >>>     # Cache the computation result (via pickle)
    >>>     cacher.save(data)                              # boilerplate:4

Surprisingly this uses just as many boilerplate lines as a decorator style
cacher, but it is much more extensible. It is possible to use :class:`Cacher`
in more sophisticated ways (e.g. metadata), but the simple in-line use is often
easier and cleaner. The following example illustrates this:

Example:
    >>> import ubelt as ub

    >>> @ub.Cacher('name', depends={'dep1': 1, 'dep2': 2})  # boilerplate:1
    >>> def func():                                         # boilerplate:2
    >>>     data = 'mydata'
    >>>     return data                                     # boilerplate:3
    >>> data = func()                                       # boilerplate:4

    >>> cacher = ub.Cacher('name', depends=['dependencies'])  # boilerplate:1
    >>> data = cacher.tryload(on_error='clear')               # boilerplate:2
    >>> if data is None:                                      # boilerplate:3
    >>>     data = 'mydata'
    >>>     cacher.save(data)                                 # boilerplate:4

While the above two are equivalent, the second version provides a simpler
traceback, explicit procedures, and makes it easier to use breakpoint debugging
(because there is no closure scope).

While :class:`Cacher` is used to store direct results of in-line code in a
pickle format, the :class:`CacheStamp` object is used to cache processes that
produces an on-disk side effects other than the main return value. For
instance, consider the following example:

Example:
    >>> import ubelt as ub
    >>> def compute_many_files(dpath):
    ...     for i in range(10):
    ...         fpath = '{}/file{}.txt'.format(dpath, i)
    ...         with open(fpath, 'w') as file:
    ...             file.write('foo' + str(i))
    >>> dpath = ub.Path.appdir('ubelt/demo/cache').delete().ensuredir()
    >>> # You must specify a directory, unlike in Cacher where it is optional
    >>> self = ub.CacheStamp('name', dpath=dpath, depends={'a': 1, 'b': 2})
    >>> if self.expired():
    >>>     compute_many_files(dpath)
    >>>     # Instead of caching the whole processes, we just write a file
    >>>     # that signals the process has been done.
    >>>     self.renew()
    >>> assert not self.expired()

The CacheStamp is lightweight in that it simply marks that a process has been
completed, but the job of saving / loading the actual data is left to the
developer. The ``expired`` method checks if the stamp exists, and ``renew``
writes the stamp to disk.

In ubelt version 1.1.0, several additional features were added to CacheStamp.
In addition to specifying parameters via ``depends``, it is also possible for
CacheStamp to determine if an associated file has been modified. To do this,
the paths of the files must be known a-priori and passed to CacheStamp via the
``product`` argument. This will allow the CacheStamp to detect if the files
have been modified since the ``renew`` method was called. It does this by
remembering the size, modified time, and checksum of each file.  If the hash of
the expected hash of the product is known in advance, it is also possible to
specify the expected ``hash_prefix`` of each product. In this case, ``renew``
will raise an Exception if this specified hash prefix does not match the files
on disk. Lastly, it is possible to specify an expiration time via ``expires``,
after which the CacheStamp will always be marked as invalid. This is now the
mechanism via which the cache in :func:`ubelt.util_download.grabdata` works.

Example:
    >>> import ubelt as ub
    >>> dpath = ub.Path.appdir('ubelt/demo/cache').delete().ensuredir()
    >>> params = {'a': 1, 'b': 2}
    >>> expected_fpaths = [dpath / 'file{}.txt'.format(i) for i in range(2)]
    >>> hash_prefix = ['a7a8a91659601590e17191301dc1',
    ...                '55ae75d991c770d8f3ef07cbfde1']
    >>> self = ub.CacheStamp('name', dpath=dpath, depends=params,
    >>>                      hash_prefix=hash_prefix, hasher='sha256',
    >>>                      product=expected_fpaths, expires='2101-01-01T000000Z')
    >>> if self.expired():
    >>>     for fpath in expected_fpaths:
    ...         fpath.write_text(fpath.name)
    >>>     self.renew()
    >>> # modifying or removing the file will cause the stamp to expire
    >>> expected_fpaths[0].write_text('corrupted')
    >>> assert self.expired()


RelatedWork:
    https://github.com/shaypal5/cachier
"""
import os
from os.path import join, normpath, basename, exists


[docs] class Cacher: """ Saves data to disk and reloads it based on specified dependencies. Cacher uses pickle to save/load data to/from disk. Dependencies of the cached process can be specified, which ensures the cached data is recomputed if the dependencies change. If the location of the cache is not specified, it will default to the system user's cache directory. Related: ..[JobLibMemory] https://joblib.readthedocs.io/en/stable/memory.html Example: >>> import ubelt as ub >>> depends = 'repr-of-params-that-uniquely-determine-the-process' >>> # Create a cacher and try loading the data >>> cacher = ub.Cacher('demo_process', depends, verbose=4) >>> cacher.clear() >>> print(f'cacher.fpath={cacher.fpath}') >>> data = cacher.tryload() >>> if data is None: >>> # Put expensive functions in if block when cacher misses >>> myvar1 = 'result of expensive process' >>> myvar2 = 'another result' >>> # Tell the cacher to write at the end of the if block >>> # It is idomatic to put results in an object named data >>> data = myvar1, myvar2 >>> cacher.save(data) >>> # Last part of the Cacher pattern is to unpack the data object >>> myvar1, myvar2 = data >>> # >>> # If we know the data exists, we can also simply call load >>> data = cacher.tryload() Example: >>> # The previous example can be shorted if only a single value >>> from ubelt.util_cache import Cacher >>> depends = 'repr-of-params-that-uniquely-determine-the-process' >>> # Create a cacher and try loading the data >>> cacher = Cacher('demo_process', depends) >>> myvar = cacher.tryload() >>> if myvar is None: >>> myvar = ('result of expensive process', 'another result') >>> cacher.save(myvar) >>> assert cacher.exists(), 'should now exist' """ VERBOSE = 1 # default verbosity FORCE_DISABLE = False # global scope override def __init__(self, fname, depends=None, dpath=None, appname='ubelt', ext='.pkl', meta=None, verbose=None, enabled=True, log=None, hasher='sha1', protocol=-1, cfgstr=None, backend='auto'): """ Args: fname (str): A file name. This is the prefix that will be used by the cache. It will always be used as-is. depends (str | List[str] | None): Indicate dependencies of this cache. If the dependencies change, then the cache is recomputed. New in version 0.8.9, replaces ``cfgstr``. dpath (str | PathLike | None): Specifies where to save the cache. If unspecified, Cacher defaults to an application cache dir as given by appname. See :func:`ub.get_app_cache_dir` for more details. appname (str): Application name Specifies a folder in the application cache directory where to cache the data if ``dpath`` is not specified. Defaults to 'ubelt'. ext (str): File extension for the cache format. Can be ``'.pkl'`` or ``'.json'``. Defaults to ``'.pkl'``. meta (object | None): Metadata that is also saved with the ``cfgstr``. This can be useful to indicate how the ``cfgstr`` was constructed. Note: this is a candidate for deprecation. verbose (int): Level of verbosity. Can be 1, 2 or 3. Defaults to 1. enabled (bool): If set to False, then the load and save methods will do nothing. Defaults to True. log (Callable[[str], Any]): Overloads the print function. Useful for sending output to loggers (e.g. logging.info, tqdm.tqdm.write, ...) hasher (str): Type of hashing algorithm to use if ``cfgstr`` needs to be condensed to less than 49 characters. Defaults to sha1. protocol (int): Protocol version used by pickle. Defaults to the -1 which is the latest protocol. backend (str): Set to either ``'pickle'`` or ``'json'`` to force backend. Defaults to auto which chooses one based on the extension. cfgstr (str | None): Deprecated in favor of ``depends``. """ if depends is None: depends = cfgstr if cfgstr is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Use depends instead', name='cfgstr', type='Cacher class arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) depends = cfgstr if verbose is None: verbose = self.VERBOSE if dpath is None: # pragma: no branch from ubelt.util_platform import platform_cache_dir import pathlib cache_dpath = pathlib.Path(platform_cache_dir()) dpath = cache_dpath / (appname or 'ubelt') dpath.mkdir(parents=True, exist_ok=True) # from ubelt.util_path import Path # dpath = os.fspath(Path.appdir(appname, type='cache')) if backend == 'auto': if ext == '.pkl': backend = 'pickle' elif ext == '.json': backend = 'json' else: backend = 'pickle' else: if backend not in {'json', 'pickel'}: raise ValueError(backend) self.dpath = dpath self.fname = fname self.depends = depends self.cfgstr = cfgstr self.verbose = verbose self.ext = ext self.meta = meta self.enabled = enabled and not self.FORCE_DISABLE self.protocol = protocol self.hasher = hasher self.log = print if log is None else log self.backend = backend if len(self.ext) > 0 and self.ext[0] != '.': raise ValueError('Please be explicit and use a dot in ext')
[docs] def _rectify_cfgstr(self, cfgstr=None): if cfgstr is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration=( 'In general, you should not need to specify a custom ' 'cfgstr after the Cacher has been created. ' 'If you must, then you can modify the ``depends`` class ' 'attribute instead, but in general it is recommend to ' 'avoid this.' ), name='cfgstr', type='Cacher method arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) cfgstr = self.cfgstr if cfgstr is None else cfgstr if cfgstr is None and self.depends is not None: # lazy hashing of depends data into cfgstr if isinstance(self.depends, str): self.cfgstr = self.depends else: from ubelt.util_hash import hash_data self.cfgstr = hash_data(self.depends) cfgstr = self.cfgstr if cfgstr is None and self.enabled: cfgstr = '' if self.fname is None: raise AssertionError('no fname specified in Cacher') if self.dpath is None: raise AssertionError('no dpath specified in Cacher') return cfgstr
[docs] def _condense_cfgstr(self, cfgstr=None): cfgstr = self._rectify_cfgstr(cfgstr) # The 49 char maxlen is just long enough for an 8 char name, an 1 char # underscore, and a 40 char sha1 hash. max_len = 49 if len(cfgstr) > max_len: from ubelt.util_hash import hash_data condensed = hash_data(cfgstr, hasher=self.hasher, base='hex') condensed = condensed[0:max_len] else: condensed = cfgstr return condensed
@property def fpath(self) -> os.PathLike: from ubelt.util_path import Path return Path(self.get_fpath())
[docs] def get_fpath(self, cfgstr=None): """ Reports the filepath that the cacher will use. It will attempt to use '{fname}_{cfgstr}{ext}' unless that is too long. Then cfgstr will be hashed. Args: cfgstr (str | None): overrides the instance-level cfgstr Returns: str | PathLike Example: >>> # xdoctest: +REQUIRES(module:pytest) >>> from ubelt.util_cache import Cacher >>> import pytest >>> #with pytest.warns(UserWarning): >>> if 1: # we no longer warn here >>> cacher = Cacher('test_cacher1') >>> cacher.get_fpath() >>> self = Cacher('test_cacher2', depends='cfg1') >>> self.get_fpath() >>> self = Cacher('test_cacher3', depends='cfg1' * 32) >>> self.get_fpath() """ condensed = self._condense_cfgstr(cfgstr) fname_cfgstr = '{}_{}{}'.format(self.fname, condensed, self.ext) fpath = join(self.dpath, fname_cfgstr) fpath = normpath(fpath) return fpath
[docs] def exists(self, cfgstr=None): """ Check to see if the cache exists Args: cfgstr (str | None): overrides the instance-level cfgstr Returns: bool """ return exists(self.get_fpath(cfgstr=cfgstr))
[docs] def existing_versions(self): """ Returns data with different cfgstr values that were previously computed with this cacher. Yields: str: paths to cached files corresponding to this cacher Example: >>> # Ensure that some data exists >>> import ubelt as ub >>> dpath = ub.Path.appdir( >>> 'ubelt/tests/util_cache', >>> 'test-existing-versions').delete().ensuredir() >>> cacher = ub.Cacher('versioned_data_v2', depends='1', dpath=dpath) >>> cacher.ensure(lambda: 'data1') >>> known_fpaths = set() >>> known_fpaths.add(cacher.get_fpath()) >>> cacher = ub.Cacher('versioned_data_v2', depends='2', dpath=dpath) >>> cacher.ensure(lambda: 'data2') >>> known_fpaths.add(cacher.get_fpath()) >>> # List previously computed configs for this type >>> from os.path import basename >>> cacher = ub.Cacher('versioned_data_v2', depends='2', dpath=dpath) >>> exist_fpaths = set(cacher.existing_versions()) >>> exist_fnames = list(map(basename, exist_fpaths)) >>> print('exist_fnames = {!r}'.format(exist_fnames)) >>> print('exist_fpaths = {!r}'.format(exist_fpaths)) >>> print('known_fpaths={!r}'.format(known_fpaths)) >>> assert exist_fpaths.issubset(known_fpaths) """ import glob pattern = join(self.dpath, self.fname + '_*' + self.ext) for fname in glob.iglob(pattern): data_fpath = join(self.dpath, fname) yield data_fpath
[docs] def clear(self, cfgstr=None): """ Removes the saved cache and metadata from disk Args: cfgstr (str | None): overrides the instance-level cfgstr """ data_fpath = self.get_fpath(cfgstr) if self.verbose > 0: self.log('[cacher] clear cache') if exists(data_fpath): if self.verbose > 0: self.log('[cacher] removing {}'.format(data_fpath)) os.remove(data_fpath) # Remove the metadata if it exists meta_fpath = data_fpath + '.meta' if exists(meta_fpath): os.remove(meta_fpath) else: if self.verbose > 0: self.log('[cacher] ... nothing to clear')
[docs] def tryload(self, cfgstr=None, on_error='raise'): """ Like load, but returns None if the load fails due to a cache miss. Args: cfgstr (str | None): overrides the instance-level cfgstr on_error (str): How to handle non-io errors errors. Either 'raise', which re-raises the exception, or 'clear' which deletes the cache and returns None. Defaults to 'raise'. Returns: None | object: the cached data if it exists, otherwise returns None """ if self.enabled: try: if self.verbose > 1: self.log('[cacher] tryload fname={}'.format(self.fname)) return self.load(cfgstr) except IOError: if self.verbose > 0: self.log('[cacher] ... {} cache miss'.format(self.fname)) except Exception: if self.verbose > 0: self.log('[cacher] ... failed to load') if on_error == 'raise': raise elif on_error == 'clear': self.clear(cfgstr) return None else: raise KeyError('Unknown method on_error={}'.format( on_error)) else: if self.verbose > 1: self.log('[cacher] ... cache disabled: fname={}'.format( self.fname)) return None
[docs] def load(self, cfgstr=None): """ Load the data cached and raise an error if something goes wrong. Args: cfgstr (str | None): overrides the instance-level cfgstr Returns: object: the cached data Raises: IOError - if the data is unable to be loaded. This could be due to a cache miss or because the cache is disabled. Example: >>> from ubelt.util_cache import * # NOQA >>> # Setting the cacher as enabled=False turns it off >>> cacher = Cacher('test_disabled_load', '', enabled=True, >>> appname='ubelt/tests/util_cache') >>> cacher.save('data') >>> assert cacher.load() == 'data' >>> cacher.enabled = False >>> assert cacher.tryload() is None """ cfgstr_ = self._rectify_cfgstr(cfgstr) dpath = self.dpath fname = self.fname verbose = self.verbose if not self.enabled: if verbose > 1: self.log('[cacher] ... cache disabled: fname={}'.format( self.fname)) raise IOError(3, 'Cache Loading Is Disabled') data_fpath = self.get_fpath(cfgstr=cfgstr) if not exists(data_fpath): if verbose > 2: self.log('[cacher] ... cache does not exist: ' 'dpath={} fname={} cfgstr={}'.format( basename(dpath), fname, cfgstr_)) raise IOError(2, 'No such file or directory: {!r}'.format(data_fpath)) else: if verbose > 3: sizestr = _byte_str(os.stat(data_fpath).st_size) self.log('[cacher] ... cache exists: ' 'dpath={} fname={} cfgstr={}, size={}'.format( basename(dpath), fname, cfgstr_, sizestr)) try: data = self._backend_load(data_fpath) except Exception as ex: if verbose > 0: self.log('CORRUPTED? fpath = {!r}'.format(data_fpath)) if verbose > 1: self.log('[cacher] ... CORRUPTED? dpath={} cfgstr={}'.format( basename(dpath), cfgstr_)) if isinstance(ex, (EOFError, IOError, ImportError)): raise IOError(str(ex)) else: if verbose > 1: self.log('[cacher] ... unknown reason for exception') raise else: if self.verbose > 2: self.log('[cacher] ... {} cache hit'.format(self.fname)) elif verbose > 1: self.log('[cacher] ... cache hit') return data
[docs] def save(self, data, cfgstr=None): """ Writes data to path specified by ``self.fpath``. Metadata containing information about the cache will also be appended to an adjacent file with the `.meta` suffix. Args: data (object): arbitrary pickleable object to be cached cfgstr (str | None): overrides the instance-level cfgstr Example: >>> from ubelt.util_cache import * # NOQA >>> # Normal functioning >>> depends = 'long-cfg' * 32 >>> cacher = Cacher('test_enabled_save', depends=depends, >>> appname='ubelt/tests/util_cache') >>> cacher.save('data') >>> assert exists(cacher.get_fpath()), 'should be enabled' >>> assert exists(cacher.get_fpath() + '.meta'), 'missing metadata' >>> # Setting the cacher as enabled=False turns it off >>> cacher2 = Cacher('test_disabled_save', 'params', enabled=False, >>> appname='ubelt/tests/util_cache') >>> cacher2.save('data') >>> assert not exists(cacher2.get_fpath()), 'should be disabled' """ from ubelt.util_path import ensuredir from ubelt.util_time import timestamp if not self.enabled: return if self.verbose > 0: self.log('[cacher] ... {} cache save'.format(self.fname)) cfgstr_ = self._rectify_cfgstr(cfgstr) condensed = self._condense_cfgstr(cfgstr) # Make sure the cache directory exists ensuredir(self.dpath) data_fpath = self.get_fpath(cfgstr=cfgstr) meta_fpath = data_fpath + '.meta' # Also save metadata file to reconstruct hashing # This may be deprecated in the future. with open(meta_fpath, 'a') as file_: # TODO: maybe append this in json or YML format? file_.write('\n\nsaving {}\n'.format(timestamp())) file_.write(self.fname + '\n') file_.write(condensed + '\n') file_.write(cfgstr_ + '\n') file_.write(str(self.meta) + '\n') self._backend_dump(data_fpath, data) if self.verbose > 3: sizestr = _byte_str(os.stat(data_fpath).st_size) self.log('[cacher] ... finish save, size={}'.format(sizestr))
[docs] def _backend_load(self, data_fpath): """ Example: >>> import ubelt as ub >>> cacher = ub.Cacher('test_other_backend', depends=['a'], ext='.json') >>> cacher.save(['data']) >>> cacher.tryload() >>> import ubelt as ub >>> cacher = ub.Cacher('test_other_backend2', depends=['a'], ext='.yaml', backend='json') >>> cacher.save({'data': [1, 2, 3]}) >>> cacher.tryload() >>> import pytest >>> with pytest.raises(ValueError): >>> ub.Cacher('test_other_backend2', depends=['a'], ext='.yaml', backend='does-not-exist') >>> cacher = ub.Cacher('test_other_backend2', depends=['a'], ext='.really-a-pickle', backend='auto') >>> assert cacher.backend == 'pickle', 'should be default' """ if self.backend == 'pickle': import pickle with open(data_fpath, 'rb') as file_: data = pickle.load(file_) elif self.backend == 'json': import json with open(data_fpath, 'r') as file_: data = json.load(file_) else: raise NotImplementedError('self.backend = {}'.format(self.backend)) return data
[docs] def _backend_dump(self, data_fpath, data): if self.backend == 'pickle': import pickle with open(data_fpath, 'wb') as file_: pickle.dump(data, file_, protocol=self.protocol) elif self.backend == 'json': import json with open(data_fpath, 'w') as file_: json.dump(data, file_) else: raise NotImplementedError('self.backend = {}'.format(self.backend)) return data
[docs] def ensure(self, func, *args, **kwargs): """ Wraps around a function. A cfgstr must be stored in the base cacher. Args: func (Callable): function that will compute data on cache miss *args: passed to func **kwargs: passed to func Example: >>> from ubelt.util_cache import * # NOQA >>> def func(): >>> return 'expensive result' >>> fname = 'test_cacher_ensure' >>> depends = 'func params' >>> cacher = Cacher(fname, depends=depends) >>> cacher.clear() >>> data1 = cacher.ensure(func) >>> data2 = cacher.ensure(func) >>> assert data1 == 'expensive result' >>> assert data1 == data2 >>> cacher.clear() """ data = self.tryload() if data is None: data = func(*args, **kwargs) self.save(data) return data
def __call__(self, func): """ Allows Cacher to be used as a decorator for functions with no arguments. This mode of usage has much less control than others, so it is only recommended for the simplest of cases. Args: func (Callable): function to decorate. Must have no arguments. Example: >>> from ubelt.util_cache import * # NOQA >>> @Cacher('demo_cacher_call', depends='foobar') >>> def func(): >>> return 'expensive result' >>> func.cacher.clear() >>> assert not func.cacher.exists() >>> data = func() >>> assert func.cacher.exists() >>> func.cacher.clear() """ # Can't return arguments because cfgstr won't take them into account def _wrapper(): data = self.ensure(func) return data _wrapper.cacher = self return _wrapper
[docs] class CacheStamp(object): """ Quickly determine if a file-producing computation has been done. Check if the computation needs to be redone by calling ``expired``. If the stamp is not expired, the user can expect that the results exist and could be loaded. If the stamp is expired, the computation should be redone. After the result is updated, the calls ``renew``, which writes a "stamp" file to disk that marks that the procedure has been done. There are several ways to control how a stamp expires. At a bare minimum, removing the stamp file will force expiration. However, in this circumstance CacheStamp only knows that something has been done, but it doesn't have any information about what was done, so in general this is not sufficient. To achieve more robust expiration behavior, the user should specify the ``product`` argument, which is a list of file paths that are expected to exist whenever the stamp is renewed. When this is specified the CacheStamp will expire if any of these products are deleted, their size changes, their modified timestamp changes, or their hash (i.e. checksum) changes. Note that by setting ``hasher=None``, running and verifying checksums can be disabled. If the user knows what the hash of the file should be this can be specified to prevent renewal of the stamp unless these match the files on disk. This can be useful for security purposes. The stamp can also be set to expire at a specified time or after a specified duration using the ``expires`` argument. Notes: The size, mtime, and hash mechanism is similar to how Makefile and redo caches work. Attributes: cacher (Cacher): underlying cacher object Example: >>> import ubelt as ub >>> # Stamp the computation of expensive-to-compute.txt >>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp') >>> dpath.delete().ensuredir() >>> product = dpath / 'expensive-to-compute.txt' >>> self = ub.CacheStamp('somedata', depends='someconfig', dpath=dpath, >>> product=product, hasher='sha256') >>> self.clear() >>> print(f'self.fpath={self.fpath}') >>> if self.expired(): >>> product.write_text('very expensive') >>> self.renew() >>> assert not self.expired() >>> # corrupting the output will cause the stamp to expire >>> product.write_text('very corrupted') >>> assert self.expired() """ def __init__(self, fname, dpath, cfgstr=None, product=None, hasher='sha1', verbose=None, enabled=True, depends=None, meta=None, hash_prefix=None, expires=None, ext='.pkl'): """ Args: fname (str): Name of the stamp file dpath (str | PathLike | None): Where to store the cached stamp file product (str | PathLike | Sequence[str | PathLike] | None): Path or paths that we expect the computation to produce. If specified the hash of the paths are stored. hasher (str): The type of hasher used to compute the file hash of product. If None, then we assume the file has not been corrupted or changed if the mtime and size are the same. Defaults to sha1. verbose (bool | None): Passed to internal :class:`ubelt.Cacher` object. Defaults to None. enabled (bool): if False, expired always returns True. Defaults to True. depends (str | List[str] | None): Indicate dependencies of this cache. If the dependencies change, then the cache is recomputed. New to CacheStamp in version 0.9.2. meta (object | None): Metadata that is also saved as a sidecar file. New to CacheStamp in version 0.9.2. Note: this is a candidate for deprecation. expires (str | int | datetime.datetime | datetime.timedelta | None): If specified, sets an expiration date for the certificate. This can be an absolute datetime or a timedelta offset. If specified as an int, this is interpreted as a time delta in seconds. If specified as a str, this is interpreted as an absolute timestamp. Time delta offsets are coerced to absolute times at "renew" time. hash_prefix (None | str | List[str]): If specified, we verify that these match the hash(s) of the product(s) in the stamp certificate. ext (str): File extension for the cache format. Can be ``'.pkl'`` or ``'.json'``. Defaults to ``'.pkl'``. cfgstr (str | None): DEPRECATED. """ self.cacher = Cacher(fname, cfgstr=cfgstr, dpath=dpath, verbose=verbose, enabled=enabled, depends=depends, meta=meta, ext=ext) self.product = product self.hasher = hasher self.expires = expires self.hash_prefix = hash_prefix # The user can modify these if they want to disable size or mtime # checks for expiration. Not sure if I want to expose it at the # top level API yet or not. self._expire_checks = { 'size': True, 'mtime': True, 'hash': True, } @property def fpath(self): return self.cacher.fpath
[docs] def clear(self): """ Delete the stamp (the products are untouched) """ return self.cacher.clear()
[docs] def _get_certificate(self, cfgstr=None): """ Returns the stamp certificate if it exists """ certificate = self.cacher.tryload(cfgstr=cfgstr, on_error='clear') return certificate
[docs] def _rectify_products(self, product=None): """ puts products in a normalized format Returns: List[Path] """ from ubelt.util_path import Path products = self.product if product is None else product if products is None: return None if not isinstance(products, (list, tuple)): products = [products] products = list(map(Path, products)) return products
[docs] def _rectify_hash_prefixes(self): """ puts products in a normalized format """ hash_prefixes = self.hash_prefix if hash_prefixes is None: return None if not isinstance(hash_prefixes, (list, tuple)): hash_prefixes = [hash_prefixes] return hash_prefixes
[docs] def _product_info(self, product=None): """ Compute summary info about each product on disk. """ products = self._rectify_products(product) product_info = {} product_info.update(self._product_file_stats()) if self.hasher is None: hasher_name = None else: if not isinstance(self.hasher, str): # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Pass hasher as a string', name='hasher', type='CacheStamp arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0') hasher_name = self.hasher.name else: hasher_name = self.hasher product_info['hasher'] = hasher_name product_info['hash'] = self._product_file_hash(products) return product_info
[docs] def _product_file_stats(self, product=None): products = self._rectify_products(product) product_stats = [p.stat() for p in products] product_file_stats = { 'mtime': [stat.st_mtime for stat in product_stats], 'size': [stat.st_size for stat in product_stats] } return product_file_stats
[docs] def _product_file_hash(self, product=None): if self.hasher is None: product_file_hash = None else: from ubelt.util_hash import hash_file products = self._rectify_products(product) product_file_hash = [ hash_file(p, hasher=self.hasher, base='hex') for p in products ] return product_file_hash
[docs] def expired(self, cfgstr=None, product=None): """ Check to see if a previously existing stamp is still valid, if the expected result of that computation still exists, and if all other expiration criteria are met. Args: cfgstr (Any): DEPRECATED product (Any): DEPRECATED Returns: bool | str: True(-thy) if the stamp is invalid, expired, or does not exist. When the stamp is expired, the reason for expiration is returned as a string. If the stamp is still valid, False is returned. Example: >>> import ubelt as ub >>> import time >>> import os >>> # Stamp the computation of expensive-to-compute.txt >>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-expired') >>> dpath.delete().ensuredir() >>> products = [ >>> dpath / 'product1.txt', >>> dpath / 'product2.txt', >>> ] >>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath, >>> product=products, hasher='sha256', >>> expires=0) >>> if self.expired(): >>> for fpath in products: >>> fpath.write_text(fpath.name) >>> self.renew() >>> fpath = products[0] >>> # Because we set the expiration delta to 0, we should already be expired >>> assert self.expired() == 'expired_cert' >>> # Disable the expiration date, renew and we should be ok >>> self.expires = None >>> self.renew() >>> assert not self.expired() >>> # Modify the mtime to cause expiration >>> orig_atime = fpath.stat().st_atime >>> orig_mtime = fpath.stat().st_mtime >>> os.utime(fpath, (orig_atime, orig_mtime + 200)) >>> assert self.expired() == 'mtime_diff' >>> self.renew() >>> assert not self.expired() >>> # rewriting the file will cause the size constraint to fail >>> # even if we hack the mtime to be the same >>> orig_atime = fpath.stat().st_atime >>> orig_mtime = fpath.stat().st_mtime >>> fpath.write_text('corrupted') >>> os.utime(fpath, (orig_atime, orig_mtime)) >>> assert self.expired() == 'size_diff' >>> self.renew() >>> assert not self.expired() >>> # Force a situation where the hash is the only thing >>> # that saves us, write a different file with the same >>> # size and mtime. >>> orig_atime = fpath.stat().st_atime >>> orig_mtime = fpath.stat().st_mtime >>> fpath.write_text('corrApted') >>> os.utime(fpath, (orig_atime, orig_mtime)) >>> assert self.expired() == 'hash_diff' >>> # Test what a wrong hash prefix causes expiration >>> certificate = self.renew() >>> self.hash_prefix = certificate['hash'] >>> self.expired() >>> self.hash_prefix = ['bad', 'hashes'] >>> self.expired() >>> # A bad hash will not allow us to renew >>> import pytest >>> with pytest.raises(RuntimeError): ... self.renew() """ if cfgstr is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Do not pass cfgstr to expired. Use the class depends arg', name='cfgstr', type='CacheStamp.expires arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) if product is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Do not pass product to expired. Use the class product arg', name='product', type='CacheStamp.expires arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) if not self.cacher.enabled: return 'disabled' certificate = self._get_certificate(cfgstr=cfgstr) if certificate is None: # We don't have a certificate, so we are expired err = 'no_cert' if self.cacher.verbose > 0: # pragma: nobranch print('[cacher] stamp expired {}'.format(err)) return err expires = certificate.get('expires', None) if expires is not None: from ubelt.util_time import timeparse # Need to add in the local timezone to compare against the cert. now = _localnow() expires_abs = timeparse(expires) if now >= expires_abs: # We are expired err = 'expired_cert' if self.cacher.verbose > 0: # pragma: nobranch print('[cacher] stamp expired {}'.format(err)) return err products = self._rectify_products(product) if products is None: # We don't have a product to check, so assume not expired return False elif not all(map(exists, products)): # We are expired if the expected product does not exist err = 'missing_products' if self.cacher.verbose > 0: # pragma: nobranch print('[cacher] stamp expired {}'.format(err)) return err else: # First test to see if the size or mtime of the files has changed # as a potentially quicker check. If sizes or mtimes do not exist # in the certificate (old ubelt version), then ignore them. product_file_stats = self._product_file_stats() sizes = certificate.get('size', None) if sizes is not None and self._expire_checks['size']: if sizes != product_file_stats['size']: # The sizes are differnt, we are expired err = 'size_diff' if self.cacher.verbose > 0: # pragma: nobranch print('[cacher] stamp expired {}'.format(err)) return err mtimes = certificate.get('mtime', None) if mtimes is not None and self._expire_checks['mtime']: if mtimes != product_file_stats['mtime']: # The sizes are differnt, we are expired err = 'mtime_diff' if self.cacher.verbose > 0: # pragma: nobranch print('[cacher] stamp expired {}'.format(err)) return err err = self._check_certificate_hashes(certificate) if err: return err # We are expired if the hash of the existing product data # does not match the expected hash in the certificate if self._expire_checks['hash']: certificate_hash = certificate.get('hash', None) product_file_hash = self._product_file_hash(products) if product_file_hash != certificate_hash: if self.cacher.verbose > 0: print('invalid hash value (expected "{}", got "{}")'.format( product_file_hash, certificate_hash)) # The hash is different, we are expired err = 'hash_diff' if self.cacher.verbose > 0: print('[cacher] stamp expired {}'.format(err)) return err # All tests passed, we are not expired return False
[docs] def _check_certificate_hashes(self, certificate): certificate_hash = certificate.get('hash', None) hash_prefixes = self._rectify_hash_prefixes() if hash_prefixes is not None: for pref_hash, cert_hash in zip(hash_prefixes, certificate_hash): if not cert_hash.startswith(pref_hash): if self.cacher.verbose > 0: print('invalid hash prefix value (expected "{}", got "{}")'.format( pref_hash, cert_hash)) err = 'hash_prefix_mismatch' return err
[docs] def _expires(self, now=None): """ Returns: datetime.datetime: the absolute local time when the stamp expires Example: >>> import ubelt as ub >>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-expires') >>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath) >>> # Test str input >>> self.expires = '2020-01-01T000000Z' >>> assert self._expires().replace(tzinfo=None).isoformat() == '2020-01-01T00:00:00' >>> # Test datetime input >>> dt = ub.timeparse(ub.timestamp()) >>> self.expires = dt >>> assert self._expires() == dt >>> # Test None input >>> self.expires = None >>> assert self._expires() is None >>> # Test int input >>> self.expires = 0 >>> assert self._expires(dt) == dt >>> self.expires = 10 >>> assert self._expires(dt) > dt >>> self.expires = -10 >>> assert self._expires(dt) < dt >>> # Test timedelta input >>> import datetime as datetime_mod >>> self.expires = datetime_mod.timedelta(seconds=-10) >>> assert self._expires(dt) == dt + self.expires """ # Rectify into a datetime from ubelt.util_time import timeparse import datetime as datetime_mod import numbers if now is None: now = datetime_mod.datetime.now() expires = self.expires if expires is None: expires_abs = None elif isinstance(expires, numbers.Number): expires_abs = now + datetime_mod.timedelta(seconds=expires) elif isinstance(expires, datetime_mod.timedelta): expires_abs = now + expires elif isinstance(expires, str): expires_abs = timeparse(expires) elif isinstance(expires, datetime_mod.datetime): expires_abs = expires else: raise TypeError( 'expires must be a coercable to datetime or timedelta') return expires_abs
[docs] def _new_certificate(self, cfgstr=None, product=None): """ Returns: dict: certificate information Example: >>> import ubelt as ub >>> # Stamp the computation of expensive-to-compute.txt >>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-cert').ensuredir() >>> product = dpath / 'product1.txt' >>> product.write_text('hi') >>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath, >>> product=product) >>> cert = self._new_certificate() >>> assert cert['expires'] is None >>> self.expires = '2020-01-01T000000' >>> self.renew() >>> cert = self._new_certificate() >>> assert cert['expires'] is not None """ from ubelt.util_time import timestamp products = self._rectify_products(product) now = _localnow() expires = self._expires(now) certificate = { 'timestamp': timestamp(now, precision=4), 'expires': None if expires is None else timestamp(expires, precision=4), 'product': None if products is None else [os.fspath(p) for p in products], } if products is not None: if not all(map(exists, products)): raise IOError( 'The stamped product must exist: {}'.format(products)) product_info = self._product_info(products) certificate.update(product_info) return certificate
[docs] def renew(self, cfgstr=None, product=None): """ Recertify that the product has been recomputed by writing a new certificate to disk. Args: cfgstr (None | str): deprecated, do not use. product (None | str | List): deprecated, do not use. Returns: None | dict: certificate information if enabled otherwise None. Example: >>> # Test that renew does nothing when the cacher is disabled >>> import ubelt as ub >>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-renew').ensuredir() >>> self = ub.CacheStamp('foo', dpath=dpath, enabled=False) >>> assert self.renew() is None """ if not self.cacher.enabled: return None if cfgstr is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Do not pass cfgstr to renew. Use the class depends arg', name='cfgstr', type='CacheStamp.renew arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) if product is not None: # nocover from ubelt import schedule_deprecation schedule_deprecation( modname='ubelt', migration='Do not pass product to renew. Use the class product arg', name='product', type='CacheStamp.renew arg', deprecate='1.1.0', error='1.3.0', remove='1.4.0', ) certificate = self._new_certificate(cfgstr, product) err = self._check_certificate_hashes(certificate) if err: raise RuntimeError(err) self.cacher.save(certificate, cfgstr=cfgstr) return certificate
[docs] def _localnow(): # Might be nice to have a util_time function add in tzinfo import datetime as datetime_mod import time local_tzinfo = datetime_mod.timezone(datetime_mod.timedelta(seconds=-time.timezone)) now = datetime_mod.datetime.now().replace(tzinfo=local_tzinfo) return now
[docs] def _byte_str(num, unit='auto', precision=2): """ Automatically chooses relevant unit (KB, MB, or GB) for displaying some number of bytes. Args: num (int): number of bytes unit (str): which unit to use, can be auto, B, KB, MB, GB, or TB References: .. [WikiOrdersOfMag] https://en.wikipedia.org/wiki/Orders_of_magnitude_(data) Returns: str: string representing the number of bytes with appropriate units Example: >>> from ubelt.util_cache import _byte_str >>> import ubelt as ub >>> num_list = [1, 100, 1024, 1048576, 1073741824, 1099511627776] >>> result = ub.urepr(list(map(_byte_str, num_list)), nl=0) >>> print(result) ['0.00KB', '0.10KB', '1.00KB', '1.00MB', '1.00GB', '1.00TB'] >>> _byte_str(10, unit='B') 10.00B """ abs_num = abs(num) if unit == 'auto': if abs_num < 2.0 ** 10: unit = 'KB' elif abs_num < 2.0 ** 20: unit = 'KB' elif abs_num < 2.0 ** 30: unit = 'MB' elif abs_num < 2.0 ** 40: unit = 'GB' else: unit = 'TB' if unit.lower().startswith('b'): num_unit = num elif unit.lower().startswith('k'): num_unit = num / (2.0 ** 10) elif unit.lower().startswith('m'): num_unit = num / (2.0 ** 20) elif unit.lower().startswith('g'): num_unit = num / (2.0 ** 30) elif unit.lower().startswith('t'): num_unit = num / (2.0 ** 40) else: raise ValueError('unknown num={!r} unit={!r}'.format(num, unit)) fmtstr = ('{:.' + str(precision) + 'f}{}') res = fmtstr.format(num_unit, unit) return res