Source code for ubelt.util_cache

# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function, unicode_literals
import os
import hashlib
from os.path import join, normpath, basename, exists
from six.moves import cPickle as pickle
import warnings


[docs]class Cacher(object): """ Cacher designed to be quickly integrated into existing scripts. Args: fname (str): A file name. This is the prefix that will be used by the cache. It will alwasys be used as-is. cfgstr (str): indicates the state. Either this string or a hash of this string will be used to identify the cache. A cfgstr should always be reasonably readable, thus it is good practice to hash extremely detailed cfgstrs to a reasonable readable level. Use meta to store make original details persist. dpath (str): Specifies where to save the cache. If unspecified, Cacher defaults to an application resource dir as given by appname. appname (str): application name (default = 'ubelt') Specifies a folder in the application resource directory where to cache the data if dpath is not specified. ext (str): extension (default = '.pkl') meta (object): cfgstr metadata that is also saved with the cfgstr. This data is not used in the hash, but if useful to send in if the cfgstr itself contains hashes. verbose (int): level of verbosity. Can be 1, 2 or 3. (default=1) enabled (bool): if set to False, then the load and save methods will do nothing. (default = True) log (func): overloads the print function. Useful for sending output to loggers (e.g. logging.info, tqdm.tqdm.write, ...) protocol (int): protocol version used by pickle. If python 2 compatibility is not required, then it is better to use protocol 4. (default=2) CommandLine: python -m ubelt.util_cache Cacher Example: >>> import ubelt as ub >>> cfgstr = 'repr-of-params-that-uniquely-determine-the-process' >>> # Create a cacher and try loading the data >>> cacher = ub.Cacher('test_process', cfgstr) >>> cacher.clear() >>> data = cacher.tryload() >>> if data is None: >>> # Put expensive functions in if block when cacher misses >>> myvar1 = 'result of expensive process' >>> myvar2 = 'another result' >>> # Tell the cacher to write at the end of the if block >>> # It is idomatic to put results in a tuple named data >>> data = myvar1, myvar2 >>> cacher.save(data) >>> # Last part of the Cacher pattern is to unpack the data tuple >>> myvar1, myvar2 = data Example: >>> # The previous example can be shorted if only a single value >>> from ubelt.util_cache import Cacher >>> cfgstr = 'repr-of-params-that-uniquely-determine-the-process' >>> # Create a cacher and try loading the data >>> cacher = Cacher('test_process', cfgstr) >>> myvar = cacher.tryload() >>> if myvar is None: >>> myvar = ('result of expensive process', 'another result') >>> cacher.save(myvar) >>> assert cacher.exists(), 'should now exist' """ VERBOSE = 1 # default verbosity def __init__(self, fname, cfgstr=None, dpath=None, appname='ubelt', ext='.pkl', meta=None, verbose=None, enabled=True, log=None, protocol=2): import ubelt as ub if verbose is None: verbose = self.VERBOSE if dpath is None: # pragma: no branch dpath = ub.ensure_app_cache_dir(appname) ub.ensuredir(dpath) self.dpath = dpath self.fname = fname self.cfgstr = cfgstr self.verbose = verbose self.ext = ext self.meta = meta self.enabled = enabled self.protocol = protocol self.log = print if log is None else log if len(self.ext) > 0 and self.ext[0] != '.': raise ValueError('Please be explicit and use a dot in ext') def _rectify_cfgstr(self, cfgstr=None): cfgstr = self.cfgstr if cfgstr is None else cfgstr if cfgstr is None: warnings.warn('No cfgstr given in Cacher constructor or call', UserWarning) cfgstr = '' assert self.fname is not None, 'no fname specified in Cacher' assert self.dpath is not None, 'no dpath specified in Cacher' return cfgstr def _condense_cfgstr(self, cfgstr=None): cfgstr = self._rectify_cfgstr(cfgstr) max_len = 32 hashlen = 32 if len(cfgstr) > max_len: hasher = hashlib.sha256() hasher.update(cfgstr.encode('utf8')) hashed_cfgstr = hasher.hexdigest()[:hashlen] condensed = hashed_cfgstr else: condensed = cfgstr return condensed
[docs] def get_fpath(self, cfgstr=None): """ Reports the filepath that the cacher will use. It will attempt to use '{fname}_{cfgstr}{ext}' unless that is too long. Then cfgstr will be hashed. Example: >>> from ubelt.util_cache import Cacher >>> import pytest >>> with pytest.warns(UserWarning): >>> cacher = Cacher('test_cacher1') >>> cacher.get_fpath() >>> self = Cacher('test_cacher2', cfgstr='cfg1') >>> self.get_fpath() >>> self = Cacher('test_cacher3', cfgstr='cfg1' * 32) >>> self.get_fpath() """ condensed = self._condense_cfgstr(cfgstr) fname_cfgstr = '{}_{}{}'.format(self.fname, condensed, self.ext) fpath = join(self.dpath, fname_cfgstr) fpath = normpath(fpath) return fpath
[docs] def exists(self, cfgstr=None): """ Check to see if the cache exists """ return exists(self.get_fpath())
[docs] def existing_versions(self): """ Returns data with different cfgstr values that were previously computed with this cacher. Example: >>> from ubelt.util_cache import Cacher >>> # Ensure that some data exists >>> known_fnames = set() >>> cacher = Cacher('versioned_data', cfgstr='1') >>> cacher.ensure(lambda: 'data1') >>> known_fnames.add(cacher.get_fpath()) >>> cacher = Cacher('versioned_data', cfgstr='2') >>> cacher.ensure(lambda: 'data2') >>> known_fnames.add(cacher.get_fpath()) >>> # List previously computed configs for this type >>> from os.path import basename >>> cacher = Cacher('versioned_data', cfgstr='2') >>> exist_fpaths = set(cacher.existing_versions()) >>> exist_fnames = list(map(basename, exist_fpaths)) >>> print(exist_fnames) >>> assert exist_fpaths == known_fnames ['versioned_data_1.pkl', 'versioned_data_2.pkl'] """ import glob pattern = join(self.dpath, self.fname + '_*' + self.ext) for fname in glob.iglob(pattern): data_fpath = join(self.dpath, fname) yield data_fpath
[docs] def clear(self, cfgstr=None): """ Removes the saved cache and metadata from disk """ data_fpath = self.get_fpath(cfgstr) if self.verbose > 0: self.log('[cacher] clear cache') if exists(data_fpath): if self.verbose > 0: self.log('[cacher] removing {}'.format(data_fpath)) os.remove(data_fpath) # Remove the metadata if it exists meta_fpath = data_fpath + '.meta' if exists(meta_fpath): os.remove(meta_fpath) else: if self.verbose > 0: self.log('[cacher] ... nothing to clear')
[docs] def tryload(self, cfgstr=None, on_error='raise'): """ Like load, but returns None if the load fails due to a cache miss. Args: on_error (str): how to handle non-io errors errors. Either raise, which re-raises the exception, or clear which clears the cache and returns None. """ cfgstr = self._rectify_cfgstr(cfgstr) if self.enabled: try: if self.verbose > 1: self.log('[cacher] tryload fname={}'.format(self.fname)) return self.load(cfgstr) except IOError: if self.verbose > 0: self.log('[cacher] ... {} cache miss'.format(self.fname)) except Exception: if self.verbose > 0: self.log('[cacher] ... failed to load') if on_error == 'raise': raise elif on_error == 'clear': self.clear(cfgstr) return None else: raise KeyError('Unknown method on_error={}'.format(on_error)) else: if self.verbose > 1: self.log('[cacher] ... cache disabled: fname={}'.format(self.fname)) return None
[docs] def load(self, cfgstr=None): """ Example: >>> from ubelt.util_cache import * # NOQA >>> # Setting the cacher as enabled=False turns it off >>> cacher = Cacher('test_disabled_load', '', enabled=True) >>> cacher.save('data') >>> assert cacher.load() == 'data' >>> cacher.enabled = False >>> assert cacher.tryload() is None """ cfgstr = self._rectify_cfgstr(cfgstr) dpath = self.dpath fname = self.fname verbose = self.verbose if not self.enabled: if verbose > 1: self.log('[cacher] ... cache disabled: fname={}'.format(self.fname)) raise IOError(3, 'Cache Loading Is Disabled') fpath = self.get_fpath(cfgstr=cfgstr) if not exists(fpath): if verbose > 2: self.log('[cacher] ... cache does not exist: ' 'dpath={} fname={} cfgstr={}'.format( basename(dpath), fname, cfgstr)) raise IOError(2, 'No such file or directory: %r' % (fpath,)) else: if verbose > 3: self.log('[cacher] ... cache exists: ' 'dpath={} fname={} cfgstr={}'.format( basename(dpath), fname, cfgstr)) try: with open(fpath, 'rb') as file_: data = pickle.load(file_) except Exception as ex: if verbose > 0: self.log('CORRUPTED? fpath = %s' % (fpath,)) if verbose > 1: self.log('[cacher] ... CORRUPTED? dpath={} cfgstr={}'.format( basename(dpath), cfgstr)) if isinstance(ex, (EOFError, IOError, ImportError)): raise IOError(str(ex)) else: if verbose > 1: self.log('[cacher] ... unknown reason for exception') raise else: if self.verbose > 2: self.log('[cacher] ... {} cache hit'.format(self.fname)) elif verbose > 1: self.log('[cacher] ... cache hit') return data
[docs] def save(self, data, cfgstr=None): """ Writes data to path specified by `self.fpath(cfgstr)`. Metadata containing information about the cache will also be appended to an adjacent file with the `.meta` suffix. Example: >>> from ubelt.util_cache import * # NOQA >>> # Normal functioning >>> cfgstr = 'long-cfg' * 32 >>> cacher = Cacher('test_enabled_save', cfgstr) >>> cacher.save('data') >>> assert exists(cacher.get_fpath()), 'should be enabeled' >>> assert exists(cacher.get_fpath() + '.meta'), 'missing metadata' >>> # Setting the cacher as enabled=False turns it off >>> cacher2 = Cacher('test_disabled_save', 'params', enabled=False) >>> cacher2.save('data') >>> assert not exists(cacher2.get_fpath()), 'should be disabled' """ import ubelt as ub if not self.enabled: return if self.verbose > 0: self.log('[cacher] ... {} cache save'.format(self.fname)) cfgstr = self._rectify_cfgstr(cfgstr) condensed = self._condense_cfgstr(cfgstr) # Make sure the cache directory exists ub.ensuredir(self.dpath) data_fpath = self.get_fpath(cfgstr=cfgstr) meta_fpath = data_fpath + '.meta' # Also save metadata file to reconstruct hashing with open(meta_fpath, 'a') as file_: # TODO: maybe append this in json format? file_.write('\n\nsaving {}\n'.format(ub.timestamp())) file_.write(self.fname + '\n') file_.write(condensed + '\n') file_.write(cfgstr + '\n') file_.write(str(self.meta) + '\n') with open(data_fpath, 'wb') as file_: # Use protocol 2 to support python2 and 3 pickle.dump(data, file_, protocol=self.protocol)
[docs] def ensure(self, func, *args, **kwargs): r""" Wraps around a function. A cfgstr must be stored in the base cacher. Args: func (callable): function that will compute data on cache miss *args: passed to func **kwargs: passed to func Example: >>> from ubelt.util_cache import * # NOQA >>> def func(): >>> return 'expensive result' >>> fname = 'test_cacher_ensure' >>> cfgstr = 'func params' >>> cacher = Cacher(fname, cfgstr) >>> cacher.clear() >>> data1 = cacher.ensure(func) >>> data2 = cacher.ensure(func) >>> assert data1 == 'expensive result' >>> assert data1 == data2 >>> cacher.clear() Example: >>> from ubelt.util_cache import * # NOQA >>> @Cacher(fname, cfgstr).ensure >>> def func(): >>> return 'expensive result' """ data = self.tryload() if data is None: data = func(*args, **kwargs) self.save(data) return data
def __call__(self, func): """ Allows Cacher to be used as a decorator for functions with no arguments. This mode of usage has much less control than others, so it is only recommended for the simplest of cases. Args: func (Function): function to decorate. Must have no arguments. Example: >>> from ubelt.util_cache import * # NOQA >>> @Cacher('demo_cacher_call', cfgstr='foobar') >>> def func(): >>> return 'expensive result' >>> func.cacher.clear() >>> assert not func.cacher.exists() >>> data = func() >>> assert func.cacher.exists() >>> func.cacher.clear() """ # Cant return arguments because cfgstr wont take them into account # def _wrapper(*args, **kwargs): # data = self.ensure(func, *args, **kwargs) # return data def _wrapper(): data = self.ensure(func) return data _wrapper.cacher = self return _wrapper
if __name__ == '__main__': r""" CommandLine: python -m ubelt.util_cache python -m ubelt.util_cache all """ import xdoctest as xdoc xdoc.doctest_module()