ubelt package

Submodules

Module contents

UBelt is a “utility belt” of commonly needed utility and helper functions. It is a currated collection of top-level utilities with functionality that falls into a mixture of categories.

The source code is available at https://github.com/Erotemic/ubelt. We also have Jupyter notebook demos.

The ubelt API is organized by submodules containing related functionality. Each submodule contains top level overview documentation, and each function contains a docstring with at least one example.

NOTE: The README on github contains information and examples complementary to these docs.

class ubelt.AutoDict[source]

Bases: UDict

An infinitely nested default dict of dicts.

Implementation of Perl’s autovivification feature that follows [SO_651794].

References

Example

>>> import ubelt as ub
>>> auto = ub.AutoDict()
>>> auto[0][10][100] = None
>>> assert str(auto) == '{0: {10: {100: None}}}'
_base

alias of UDict

to_dict()[source]

Recursively casts a AutoDict into a regular dictionary. All directly nested AutoDict values are also converted.

This effectively de-defaults the structure.

Returns:

a copy of this dict without autovivification

Return type:

dict

Example

>>> import ubelt as ub
>>> auto = ub.AutoDict()
>>> auto[1] = 1
>>> auto['n1'] = ub.AutoDict()
>>> static = auto.to_dict()
>>> assert not isinstance(static, ub.AutoDict)
>>> assert not isinstance(static['n1'], ub.AutoDict)

Example

>>> import ubelt as ub
>>> auto = ub.AutoOrderedDict()
>>> auto[0][3] = 3
>>> auto[0][2] = 2
>>> auto[0][1] = 1
>>> assert list(auto[0].values()) == [3, 2, 1]
ubelt.AutoOrderedDict

alias of AutoDict

class ubelt.CacheStamp(fname, dpath, cfgstr=None, product=None, hasher='sha1', verbose=None, enabled=True, depends=None, meta=None, hash_prefix=None, expires=None, ext='.pkl')[source]

Bases: object

Quickly determine if a file-producing computation has been done.

Check if the computation needs to be redone by calling expired. If the stamp is not expired, the user can expect that the results exist and could be loaded. If the stamp is expired, the computation should be redone. After the result is updated, the calls renew, which writes a “stamp” file to disk that marks that the procedure has been done.

There are several ways to control how a stamp expires. At a bare minimum, removing the stamp file will force expiration. However, in this circumstance CacheStamp only knows that something has been done, but it doesn’t have any information about what was done, so in general this is not sufficient.

To achieve more robust expiration behavior, the user should specify the product argument, which is a list of file paths that are expected to exist whenever the stamp is renewed. When this is specified the CacheStamp will expire if any of these products are deleted, their size changes, their modified timestamp changes, or their hash (i.e. checksum) changes. Note that by setting hasher=None, running and verifying checksums can be disabled.

If the user knows what the hash of the file should be this can be specified to prevent renewal of the stamp unless these match the files on disk. This can be useful for security purposes.

The stamp can also be set to expire at a specified time or after a specified duration using the expires argument.

Notes

The size, mtime, and hash mechanism is similar to how Makefile and redo caches work.

Variables:

cacher (Cacher) – underlying cacher object

Example

>>> import ubelt as ub
>>> # Stamp the computation of expensive-to-compute.txt
>>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp')
>>> dpath.delete().ensuredir()
>>> product = dpath / 'expensive-to-compute.txt'
>>> self = ub.CacheStamp('somedata', depends='someconfig', dpath=dpath,
>>>                      product=product, hasher='sha256')
>>> self.clear()
>>> print(f'self.fpath={self.fpath}')
>>> if self.expired():
>>>     product.write_text('very expensive')
>>>     self.renew()
>>> assert not self.expired()
>>> # corrupting the output will cause the stamp to expire
>>> product.write_text('very corrupted')
>>> assert self.expired()
Parameters:
  • fname (str) – Name of the stamp file

  • dpath (str | PathLike | None) – Where to store the cached stamp file

  • product (str | PathLike | Sequence[str | PathLike] | None) – Path or paths that we expect the computation to produce. If specified the hash of the paths are stored.

  • hasher (str) – The type of hasher used to compute the file hash of product. If None, then we assume the file has not been corrupted or changed if the mtime and size are the same. Defaults to sha1.

  • verbose (bool | None) – Passed to internal ubelt.Cacher object. Defaults to None.

  • enabled (bool) – if False, expired always returns True. Defaults to True.

  • depends (str | List[str] | None) – Indicate dependencies of this cache. If the dependencies change, then the cache is recomputed. New to CacheStamp in version 0.9.2.

  • meta (object | None) – Metadata that is also saved as a sidecar file. New to CacheStamp in version 0.9.2. Note: this is a candidate for deprecation.

  • expires (str | int | datetime.datetime | datetime.timedelta | None) – If specified, sets an expiration date for the certificate. This can be an absolute datetime or a timedelta offset. If specified as an int, this is interpreted as a time delta in seconds. If specified as a str, this is interpreted as an absolute timestamp. Time delta offsets are coerced to absolute times at “renew” time.

  • hash_prefix (None | str | List[str]) – If specified, we verify that these match the hash(s) of the product(s) in the stamp certificate.

  • ext (str) – File extension for the cache format. Can be '.pkl' or '.json'. Defaults to '.pkl'.

  • cfgstr (str | None) – DEPRECATED.

property fpath
clear()[source]

Delete the stamp (the products are untouched)

_get_certificate(cfgstr=None)[source]

Returns the stamp certificate if it exists

_rectify_products(product=None)[source]

puts products in a normalized format

Returns:

List[Path]

_rectify_hash_prefixes()[source]

puts products in a normalized format

_product_info(product=None)[source]

Compute summary info about each product on disk.

_product_file_stats(product=None)[source]
_product_file_hash(product=None)[source]
expired(cfgstr=None, product=None)[source]

Check to see if a previously existing stamp is still valid, if the expected result of that computation still exists, and if all other expiration criteria are met.

Parameters:
  • cfgstr (Any) – DEPRECATED

  • product (Any) – DEPRECATED

Returns:

True(-thy) if the stamp is invalid, expired, or does not exist. When the stamp is expired, the reason for expiration is returned as a string. If the stamp is still valid, False is returned.

Return type:

bool | str

Example

>>> import ubelt as ub
>>> import time
>>> import os
>>> # Stamp the computation of expensive-to-compute.txt
>>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-expired')
>>> dpath.delete().ensuredir()
>>> products = [
>>>     dpath / 'product1.txt',
>>>     dpath / 'product2.txt',
>>> ]
>>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath,
>>>                      product=products, hasher='sha256',
>>>                      expires=0)
>>> if self.expired():
>>>     for fpath in products:
>>>         fpath.write_text(fpath.name)
>>>     self.renew()
>>> fpath = products[0]
>>> # Because we set the expiration delta to 0, we should already be expired
>>> assert self.expired() == 'expired_cert'
>>> # Disable the expiration date, renew and we should be ok
>>> self.expires = None
>>> self.renew()
>>> assert not self.expired()
>>> # Modify the mtime to cause expiration
>>> orig_atime = fpath.stat().st_atime
>>> orig_mtime = fpath.stat().st_mtime
>>> os.utime(fpath, (orig_atime, orig_mtime + 200))
>>> assert self.expired() == 'mtime_diff'
>>> self.renew()
>>> assert not self.expired()
>>> # rewriting the file will cause the size constraint to fail
>>> # even if we hack the mtime to be the same
>>> orig_atime = fpath.stat().st_atime
>>> orig_mtime = fpath.stat().st_mtime
>>> fpath.write_text('corrupted')
>>> os.utime(fpath, (orig_atime, orig_mtime))
>>> assert self.expired() == 'size_diff'
>>> self.renew()
>>> assert not self.expired()
>>> # Force a situation where the hash is the only thing
>>> # that saves us, write a different file with the same
>>> # size and mtime.
>>> orig_atime = fpath.stat().st_atime
>>> orig_mtime = fpath.stat().st_mtime
>>> fpath.write_text('corrApted')
>>> os.utime(fpath, (orig_atime, orig_mtime))
>>> assert self.expired() == 'hash_diff'
>>> # Test what a wrong hash prefix causes expiration
>>> certificate = self.renew()
>>> self.hash_prefix = certificate['hash']
>>> self.expired()
>>> self.hash_prefix = ['bad', 'hashes']
>>> self.expired()
>>> # A bad hash will not allow us to renew
>>> import pytest
>>> with pytest.raises(RuntimeError):
...     self.renew()
_check_certificate_hashes(certificate)[source]
_expires(now=None)[source]
Returns:

the absolute local time when the stamp expires

Return type:

datetime.datetime

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-expires')
>>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath)
>>> # Test str input
>>> self.expires = '2020-01-01T000000Z'
>>> assert self._expires().replace(tzinfo=None).isoformat() == '2020-01-01T00:00:00'
>>> # Test datetime input
>>> dt = ub.timeparse(ub.timestamp())
>>> self.expires = dt
>>> assert self._expires() == dt
>>> # Test None input
>>> self.expires = None
>>> assert self._expires() is None
>>> # Test int input
>>> self.expires = 0
>>> assert self._expires(dt) == dt
>>> self.expires = 10
>>> assert self._expires(dt) > dt
>>> self.expires = -10
>>> assert self._expires(dt) < dt
>>> # Test timedelta input
>>> import datetime as datetime_mod
>>> self.expires = datetime_mod.timedelta(seconds=-10)
>>> assert self._expires(dt) == dt + self.expires
_new_certificate(cfgstr=None, product=None)[source]
Returns:

certificate information

Return type:

dict

Example

>>> import ubelt as ub
>>> # Stamp the computation of expensive-to-compute.txt
>>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-cert').ensuredir()
>>> product = dpath / 'product1.txt'
>>> product.write_text('hi')
>>> self = ub.CacheStamp('myname', depends='myconfig', dpath=dpath,
>>>                      product=product)
>>> cert = self._new_certificate()
>>> assert cert['expires'] is None
>>> self.expires = '2020-01-01T000000'
>>> self.renew()
>>> cert = self._new_certificate()
>>> assert cert['expires'] is not None
renew(cfgstr=None, product=None)[source]

Recertify that the product has been recomputed by writing a new certificate to disk.

Parameters:
  • cfgstr (None | str) – deprecated, do not use.

  • product (None | str | List) – deprecated, do not use.

Returns:

certificate information if enabled otherwise None.

Return type:

None | dict

Example

>>> # Test that renew does nothing when the cacher is disabled
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt/tests/cache-stamp-renew').ensuredir()
>>> self = ub.CacheStamp('foo', dpath=dpath, enabled=False)
>>> assert self.renew() is None
class ubelt.Cacher(fname, depends=None, dpath=None, appname='ubelt', ext='.pkl', meta=None, verbose=None, enabled=True, log=None, hasher='sha1', protocol=-1, cfgstr=None, backend='auto')[source]

Bases: object

Saves data to disk and reloads it based on specified dependencies.

Cacher uses pickle to save/load data to/from disk. Dependencies of the cached process can be specified, which ensures the cached data is recomputed if the dependencies change. If the location of the cache is not specified, it will default to the system user’s cache directory.

Related:

..[JobLibMemory] https://joblib.readthedocs.io/en/stable/memory.html

Example

>>> import ubelt as ub
>>> depends = 'repr-of-params-that-uniquely-determine-the-process'
>>> # Create a cacher and try loading the data
>>> cacher = ub.Cacher('demo_process', depends, verbose=4)
>>> cacher.clear()
>>> print(f'cacher.fpath={cacher.fpath}')
>>> data = cacher.tryload()
>>> if data is None:
>>>     # Put expensive functions in if block when cacher misses
>>>     myvar1 = 'result of expensive process'
>>>     myvar2 = 'another result'
>>>     # Tell the cacher to write at the end of the if block
>>>     # It is idomatic to put results in an object named data
>>>     data = myvar1, myvar2
>>>     cacher.save(data)
>>> # Last part of the Cacher pattern is to unpack the data object
>>> myvar1, myvar2 = data
>>> #
>>> # If we know the data exists, we can also simply call load
>>> data = cacher.tryload()

Example

>>> # The previous example can be shorted if only a single value
>>> from ubelt.util_cache import Cacher
>>> depends = 'repr-of-params-that-uniquely-determine-the-process'
>>> # Create a cacher and try loading the data
>>> cacher = Cacher('demo_process', depends)
>>> myvar = cacher.tryload()
>>> if myvar is None:
>>>     myvar = ('result of expensive process', 'another result')
>>>     cacher.save(myvar)
>>> assert cacher.exists(), 'should now exist'
Parameters:
  • fname (str) – A file name. This is the prefix that will be used by the cache. It will always be used as-is.

  • depends (str | List[str] | None) – Indicate dependencies of this cache. If the dependencies change, then the cache is recomputed. New in version 0.8.9, replaces cfgstr.

  • dpath (str | PathLike | None) – Specifies where to save the cache. If unspecified, Cacher defaults to an application cache dir as given by appname. See ub.get_app_cache_dir() for more details.

  • appname (str) – Application name Specifies a folder in the application cache directory where to cache the data if dpath is not specified. Defaults to ‘ubelt’.

  • ext (str) – File extension for the cache format. Can be '.pkl' or '.json'. Defaults to '.pkl'.

  • meta (object | None) – Metadata that is also saved with the cfgstr. This can be useful to indicate how the cfgstr was constructed. Note: this is a candidate for deprecation.

  • verbose (int) – Level of verbosity. Can be 1, 2 or 3. Defaults to 1.

  • enabled (bool) – If set to False, then the load and save methods will do nothing. Defaults to True.

  • log (Callable[[str], Any]) – Overloads the print function. Useful for sending output to loggers (e.g. logging.info, tqdm.tqdm.write, …)

  • hasher (str) – Type of hashing algorithm to use if cfgstr needs to be condensed to less than 49 characters. Defaults to sha1.

  • protocol (int) – Protocol version used by pickle. Defaults to the -1 which is the latest protocol.

  • backend (str) – Set to either 'pickle' or 'json' to force backend. Defaults to auto which chooses one based on the extension.

  • cfgstr (str | None) – Deprecated in favor of depends.

VERBOSE = 1
FORCE_DISABLE = False
_rectify_cfgstr(cfgstr=None)[source]
_condense_cfgstr(cfgstr=None)[source]
property fpath: PathLike
get_fpath(cfgstr=None)[source]

Reports the filepath that the cacher will use.

It will attempt to use ‘{fname}_{cfgstr}{ext}’ unless that is too long. Then cfgstr will be hashed.

Parameters:

cfgstr (str | None) – overrides the instance-level cfgstr

Returns:

str | PathLike

Example

>>> # xdoctest: +REQUIRES(module:pytest)
>>> from ubelt.util_cache import Cacher
>>> import pytest
>>> #with pytest.warns(UserWarning):
>>> if 1:  # we no longer warn here
>>>     cacher = Cacher('test_cacher1')
>>>     cacher.get_fpath()
>>> self = Cacher('test_cacher2', depends='cfg1')
>>> self.get_fpath()
>>> self = Cacher('test_cacher3', depends='cfg1' * 32)
>>> self.get_fpath()
exists(cfgstr=None)[source]

Check to see if the cache exists

Parameters:

cfgstr (str | None) – overrides the instance-level cfgstr

Returns:

bool

existing_versions()[source]

Returns data with different cfgstr values that were previously computed with this cacher.

Yields:

str – paths to cached files corresponding to this cacher

Example

>>> # Ensure that some data exists
>>> import ubelt as ub
>>> dpath = ub.Path.appdir(
>>>     'ubelt/tests/util_cache',
>>>     'test-existing-versions').delete().ensuredir()
>>> cacher = ub.Cacher('versioned_data_v2', depends='1', dpath=dpath)
>>> cacher.ensure(lambda: 'data1')
>>> known_fpaths = set()
>>> known_fpaths.add(cacher.get_fpath())
>>> cacher = ub.Cacher('versioned_data_v2', depends='2', dpath=dpath)
>>> cacher.ensure(lambda: 'data2')
>>> known_fpaths.add(cacher.get_fpath())
>>> # List previously computed configs for this type
>>> from os.path import basename
>>> cacher = ub.Cacher('versioned_data_v2', depends='2', dpath=dpath)
>>> exist_fpaths = set(cacher.existing_versions())
>>> exist_fnames = list(map(basename, exist_fpaths))
>>> print('exist_fnames = {!r}'.format(exist_fnames))
>>> print('exist_fpaths = {!r}'.format(exist_fpaths))
>>> print('known_fpaths={!r}'.format(known_fpaths))
>>> assert exist_fpaths.issubset(known_fpaths)
clear(cfgstr=None)[source]

Removes the saved cache and metadata from disk

Parameters:

cfgstr (str | None) – overrides the instance-level cfgstr

tryload(cfgstr=None, on_error='raise')[source]

Like load, but returns None if the load fails due to a cache miss.

Parameters:
  • cfgstr (str | None) – overrides the instance-level cfgstr

  • on_error (str) – How to handle non-io errors errors. Either ‘raise’, which re-raises the exception, or ‘clear’ which deletes the cache and returns None. Defaults to ‘raise’.

Returns:

the cached data if it exists, otherwise returns None

Return type:

None | object

load(cfgstr=None)[source]

Load the data cached and raise an error if something goes wrong.

Parameters:

cfgstr (str | None) – overrides the instance-level cfgstr

Returns:

the cached data

Return type:

object

Raises:

IOError - if the data is unable to be loaded. This could be due to – a cache miss or because the cache is disabled.

Example

>>> from ubelt.util_cache import *  # NOQA
>>> # Setting the cacher as enabled=False turns it off
>>> cacher = Cacher('test_disabled_load', '', enabled=True,
>>>                 appname='ubelt/tests/util_cache')
>>> cacher.save('data')
>>> assert cacher.load() == 'data'
>>> cacher.enabled = False
>>> assert cacher.tryload() is None
save(data, cfgstr=None)[source]

Writes data to path specified by self.fpath.

Metadata containing information about the cache will also be appended to an adjacent file with the .meta suffix.

Parameters:
  • data (object) – arbitrary pickleable object to be cached

  • cfgstr (str | None) – overrides the instance-level cfgstr

Example

>>> from ubelt.util_cache import *  # NOQA
>>> # Normal functioning
>>> depends = 'long-cfg' * 32
>>> cacher = Cacher('test_enabled_save', depends=depends,
>>>                 appname='ubelt/tests/util_cache')
>>> cacher.save('data')
>>> assert exists(cacher.get_fpath()), 'should be enabled'
>>> assert exists(cacher.get_fpath() + '.meta'), 'missing metadata'
>>> # Setting the cacher as enabled=False turns it off
>>> cacher2 = Cacher('test_disabled_save', 'params', enabled=False,
>>>                  appname='ubelt/tests/util_cache')
>>> cacher2.save('data')
>>> assert not exists(cacher2.get_fpath()), 'should be disabled'
_backend_load(data_fpath)[source]

Example

>>> import ubelt as ub
>>> cacher = ub.Cacher('test_other_backend', depends=['a'], ext='.json')
>>> cacher.save(['data'])
>>> cacher.tryload()
>>> import ubelt as ub
>>> cacher = ub.Cacher('test_other_backend2', depends=['a'], ext='.yaml', backend='json')
>>> cacher.save({'data': [1, 2, 3]})
>>> cacher.tryload()
>>> import pytest
>>> with pytest.raises(ValueError):
>>>     ub.Cacher('test_other_backend2', depends=['a'], ext='.yaml', backend='does-not-exist')
>>> cacher = ub.Cacher('test_other_backend2', depends=['a'], ext='.really-a-pickle', backend='auto')
>>> assert cacher.backend == 'pickle', 'should be default'
_backend_dump(data_fpath, data)[source]
ensure(func, *args, **kwargs)[source]

Wraps around a function. A cfgstr must be stored in the base cacher.

Parameters:
  • func (Callable) – function that will compute data on cache miss

  • *args – passed to func

  • **kwargs – passed to func

Example

>>> from ubelt.util_cache import *  # NOQA
>>> def func():
>>>     return 'expensive result'
>>> fname = 'test_cacher_ensure'
>>> depends = 'func params'
>>> cacher = Cacher(fname, depends=depends)
>>> cacher.clear()
>>> data1 = cacher.ensure(func)
>>> data2 = cacher.ensure(func)
>>> assert data1 == 'expensive result'
>>> assert data1 == data2
>>> cacher.clear()
class ubelt.CaptureStdout(suppress=True, enabled=True)[source]

Bases: CaptureStream

Context manager that captures stdout and stores it in an internal stream.

Depending on the value of supress, the user can control if stdout is printed (i.e. if stdout is tee-ed or supressed) while it is being captured.

SeeAlso:
contextlib.redirect_stdout() - similar, but does not have the

ability to print stdout while it is being captured.

Variables:
  • text (str | None) – internal storage for the most recent part

  • parts (List[str]) – internal storage for all parts

  • cap_stdout (None | TeeStringIO) – internal stream proxy

  • orig_stdout (io.TextIOBase) – internal pointer to the original stdout stream

Example

>>> import ubelt as ub
>>> self = ub.CaptureStdout(suppress=True)
>>> print('dont capture the table flip (╯°□°)╯︵ ┻━┻')
>>> with self:
...     text = 'capture the heart ♥'
...     print(text)
>>> print('dont capture look of disapproval ಠ_ಠ')
>>> assert isinstance(self.text, str)
>>> assert self.text == text + '\n', 'failed capture text'

Example

>>> import ubelt as ub
>>> self = ub.CaptureStdout(suppress=False)
>>> with self:
...     print('I am captured and printed in stdout')
>>> assert self.text.strip() == 'I am captured and printed in stdout'

Example

>>> import ubelt as ub
>>> self = ub.CaptureStdout(suppress=True, enabled=False)
>>> with self:
...     print('dont capture')
>>> assert self.text is None
Parameters:
  • suppress (bool) – if True, stdout is not printed while captured. Defaults to True.

  • enabled (bool) – does nothing if this is False. Defaults to True.

log_part()[source]

Log what has been captured so far

start()[source]
stop()[source]

Example

>>> import ubelt as ub
>>> ub.CaptureStdout(enabled=False).stop()
>>> ub.CaptureStdout(enabled=True).stop()
close()[source]
class ubelt.CaptureStream[source]

Bases: object

Generic class for capturing streaming output from stdout or stderr

class ubelt.ChDir(dpath)[source]

Bases: object

Context manager that changes the current working directory and then returns you to where you were.

This is nearly the same as the stdlib contextlib.chdir(), with the exception that it will do nothing if the input path is None (i.e. the user did not want to change directories).

SeeAlso:

contextlib.chdir()

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt/tests/chdir').ensuredir()
>>> dir1 = (dpath / 'dir1').ensuredir()
>>> dir2 = (dpath / 'dir2').ensuredir()
>>> with ChDir(dpath):
>>>     assert ub.Path.cwd() == dpath
>>>     # change to the given directory, and then returns back
>>>     with ChDir(dir1):
>>>         assert ub.Path.cwd() == dir1
>>>         with ChDir(dir2):
>>>             assert ub.Path.cwd() == dir2
>>>             # changes inside the context manager will be reset
>>>             os.chdir(dpath)
>>>         assert ub.Path.cwd() == dir1
>>>     assert ub.Path.cwd() == dpath
>>>     with ChDir(dir1):
>>>         assert ub.Path.cwd() == dir1
>>>         with ChDir(None):
>>>             assert ub.Path.cwd() == dir1
>>>             # When disabled, the cwd does *not* reset at context exit
>>>             os.chdir(dir2)
>>>         assert ub.Path.cwd() == dir2
>>>         os.chdir(dir1)
>>>         # Dont change dirs, but reset to your cwd at context end
>>>         with ChDir('.'):
>>>             os.chdir(dir2)
>>>         assert ub.Path.cwd() == dir1
>>>     assert ub.Path.cwd() == dpath
Parameters:

dpath (str | PathLike | None) – The new directory to work in. If None, then the context manager is disabled.

class ubelt.DownloadManager(download_root=None, mode='thread', max_workers=None, cache=True)[source]

Bases: object

Simple implementation of the download manager

Example

>>> # xdoctest: +REQUIRES(--network)
>>> import ubelt as ub
>>> # Download a file with a known hash
>>> manager = ub.DownloadManager()
>>> job = manager.submit(
>>>     'http://i.imgur.com/rqwaDag.png',
>>>     hash_prefix='31a129618c87dd667103e7154182e3c39a605eefe90f84f2283f3c87efee8e40'
>>> )
>>> fpath = job.result()
>>> print('fpath = {!r}'.format(fpath))

Example

>>> # Does not require network
>>> import ubelt as ub
>>> manager = ub.DownloadManager()
>>> for i in range(100):
...     job = manager.submit('localhost/might-not-exist-i-{}'.format(i))
>>> file_paths = []
>>> for job in manager.as_completed(prog=True):
...     try:
...         fpath = job.result()
...         file_paths += [fpath]
...     except Exception:
...         pass
>>> print('file_paths = {!r}'.format(file_paths))

Example

>>> # xdoctest: +REQUIRES(--network)
>>> import pytest
>>> import ubelt as ub
>>> manager = ub.DownloadManager()
>>> item1 = {
>>>     'url': 'https://data.kitware.com/api/v1/item/5b4039308d777f2e6225994c/download',
>>>     'dst': 'forgot_what_the_name_really_is',
>>>     'hash_prefix': 'c98a46cb31205cf',
>>>     'hasher': 'sha512',
>>> }
>>> item2 = {
>>>     'url': 'http://i.imgur.com/rqwaDag.png',
>>>     'hash_prefix': 'f79ea24571da6ddd2ba12e3d57b515249ecb8a35',
>>>     'hasher': 'sha1',
>>> }
>>> item1 = item2  # hack around SSL error
>>> manager.submit(**item1)
>>> manager.submit(**item2)
>>> for job in manager.as_completed(prog=True, verbose=3):
>>>     fpath = job.result()
>>>     print('fpath = {!r}'.format(fpath))
Parameters:
  • download_root (str | PathLike) – default download location

  • mode (str) – either thread, process, or serial

  • cache (bool) – defaults to True

  • max_workers (int | None) – maximum concurrent tasks

Todo

  • [ ] Will likely have to initialize and store some sort of

    “connection state” objects.

submit(url, dst=None, hash_prefix=None, hasher='sha256')[source]

Add a job to the download Queue

Parameters:
  • url (str | PathLike) – pointer to the data to download

  • dst (str | None) – The relative or absolute path to download to. If unspecified, the destination name is derived from the url.

  • hash_prefix (str | None) – If specified, verifies that the hash of the downloaded file starts with this.

  • hasher (str, default=’sha256’) – hashing algorithm to use if hash_prefix is specified.

Returns:

a Future object that will point to the downloaded location.

Return type:

concurrent.futures.Future

as_completed(prog=None, desc=None, verbose=1)[source]

Generate completed jobs as they become available

Parameters:
  • prog (None | bool | type) – if True, uses a ub.ProgIter progress bar. Can also be a class with a compatible progiter API.

  • desc (str | None) – if specified, reports progress with a ubelt.progiter.ProgIter object.

  • verbose (int) – verbosity

Example

>>> import pytest
>>> import ubelt as ub
>>> download_root = ub.ensure_app_config_dir('ubelt', 'dlman')
>>> manager = ub.DownloadManager(download_root=download_root,
>>>                              cache=False)
>>> for i in range(3):
>>>     manager.submit('localhost')
>>> results = list(manager)
>>> print('results = {!r}'.format(results))
>>> manager.shutdown()
shutdown()[source]

Cancel all jobs and close all connections.

class ubelt.Executor(mode='thread', max_workers=0)[source]

Bases: object

A concrete asynchronous executor with a configurable backend.

The type of parallelism (or lack thereof) is configured via the mode parameter, which can be: “process”, “thread”, or “serial”. This allows the user to easily enable / disable parallelism or switch between processes and threads without modifying the surrounding logic.

SeeAlso:

In the case where you cant or dont want to use ubelt.Executor you can get similar behavior with the following pure-python snippet:

def Executor(max_workers):
    # Stdlib-only "ubelt.Executor"-like behavior
    if max_workers == 1:
        import contextlib
        def submit_partial(func, *args, **kwargs):
            def wrapper():
                return func(*args, **kwargs)
            wrapper.result = wrapper
            return wrapper
        executor = contextlib.nullcontext()
        executor.submit = submit_partial
    else:
        from concurrent.futures import ThreadPoolExecutor
        executor = ThreadPoolExecutor(max_workers=max_workers)
    return executor

executor = Executor(0)
with executor:
    jobs = []

    for arg in range(1000):
        job = executor.submit(chr, arg)
        jobs.append(job)

    results = []
    for job in jobs:
        result = job.result()
        results.append(result)

print('results = {}'.format(ub.urepr(results, nl=1)))
Variables:

backend (SerialExecutor | ThreadPoolExecutor | ProcessPoolExecutor)

Example

>>> import ubelt as ub
>>> # Prototype code using simple serial processing
>>> executor = ub.Executor(mode='serial', max_workers=0)
>>> jobs = [executor.submit(sum, [i + 1, i]) for i in range(10)]
>>> print([job.result() for job in jobs])
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
>>> # Enable parallelism by only changing one parameter
>>> executor = ub.Executor(mode='process', max_workers=0)
>>> jobs = [executor.submit(sum, [i + 1, i]) for i in range(10)]
>>> print([job.result() for job in jobs])
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
Parameters:
  • mode (str) – The backend parallelism mechanism. Can be either thread, serial, or process. Defaults to ‘thread’.

  • max_workers (int) – number of workers. If 0, serial is forced. Defaults to 0.

submit(func, *args, **kw)[source]

Calls the submit function of the underlying backend.

Returns:

a future representing the job

Return type:

concurrent.futures.Future

shutdown()[source]

Calls the shutdown function of the underlying backend.

map(fn, *iterables, **kwargs)[source]

Calls the map function of the underlying backend.

CommandLine

xdoctest -m ubelt.util_futures Executor.map

Example

>>> import ubelt as ub
>>> import concurrent.futures
>>> import string
>>> with ub.Executor(mode='serial') as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> with ub.Executor(mode='thread', max_workers=2) as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> # xdoctest: +IGNORE_WANT
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ubelt.FormatterExtensions

alias of ReprExtensions

class ubelt.IndexableWalker(data, dict_cls=(<class 'dict'>, ), list_cls=(<class 'list'>, <class 'tuple'>))[source]

Bases: Generator

Traverses through a nested tree-liked indexable structure.

Generates a path and value to each node in the structure. The path is a list of indexes which if applied in order will reach the value.

The __setitem__ method can be used to modify a nested value based on the path returned by the generator.

When generating values, you can use “send” to prevent traversal of a particular branch.

RelatedWork:
Variables:
  • data (dict | list | tuple) – the wrapped indexable data

  • dict_cls (Tuple[type]) – the types that should be considered dictionary mappings for the purpose of nested iteration. Defaults to dict.

  • list_cls (Tuple[type]) – the types that should be considered list-like for the purposes of nested iteration. Defaults to (list, tuple).

  • indexable_cls (Tuple[type]) – combined dict_cls and list_cls

Example

>>> import ubelt as ub
>>> # Given Nested Data
>>> data = {
>>>     'foo': {'bar': 1},
>>>     'baz': [{'biz': 3}, {'buz': [4, 5, 6]}],
>>> }
>>> # Create an IndexableWalker
>>> walker = ub.IndexableWalker(data)
>>> # We iterate over the data as if it was flat
>>> # ignore the <want> string due to order issues on older Pythons
>>> # xdoctest: +IGNORE_WANT
>>> for path, val in walker:
>>>     print(path)
['foo']
['baz']
['baz', 0]
['baz', 1]
['baz', 1, 'buz']
['baz', 1, 'buz', 0]
['baz', 1, 'buz', 1]
['baz', 1, 'buz', 2]
['baz', 0, 'biz']
['foo', 'bar']
>>> # We can use "paths" as keys to getitem into the walker
>>> path = ['baz', 1, 'buz', 2]
>>> val = walker[path]
>>> assert val == 6
>>> # We can use "paths" as keys to setitem into the walker
>>> assert data['baz'][1]['buz'][2] == 6
>>> walker[path] = 7
>>> assert data['baz'][1]['buz'][2] == 7
>>> # We can use "paths" as keys to delitem into the walker
>>> assert data['baz'][1]['buz'][1] == 5
>>> del walker[['baz', 1, 'buz', 1]]
>>> assert data['baz'][1]['buz'][1] == 7

Example

>>> # Create nested data
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import numpy as np
>>> import ubelt as ub
>>> data = ub.ddict(lambda: int)
>>> data['foo'] = ub.ddict(lambda: int)
>>> data['bar'] = np.array([1, 2, 3])
>>> data['foo']['a'] = 1
>>> data['foo']['b'] = np.array([1, 2, 3])
>>> data['foo']['c'] = [1, 2, 3]
>>> data['baz'] = 3
>>> print('data = {}'.format(ub.repr2(data, nl=True)))
>>> # We can walk through every node in the nested tree
>>> walker = ub.IndexableWalker(data)
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     if path[-1] == 'c':
>>>         # Use send to prevent traversing this branch
>>>         got = walker.send(False)
>>>         # We can modify the value based on the returned path
>>>         walker[path] = 'changed the value of c'
>>> print('data = {}'.format(ub.repr2(data, nl=True)))
>>> assert data['foo']['c'] == 'changed the value of c'

Example

>>> # Test sending false for every data item
>>> import ubelt as ub
>>> data = {1: [1, 2, 3], 2: [1, 2, 3]}
>>> walker = ub.IndexableWalker(data)
>>> # Sending false means you wont traverse any further on that path
>>> num_iters_v1 = 0
>>> for path, value in walker:
>>>     print('[v1] walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     walker.send(False)
>>>     num_iters_v1 += 1
>>> num_iters_v2 = 0
>>> for path, value in walker:
>>>     # When we dont send false we walk all the way down
>>>     print('[v2] walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters_v2 += 1
>>> assert num_iters_v1 == 2
>>> assert num_iters_v2 == 8

Example

>>> # Test numpy
>>> # xdoctest: +REQUIRES(CPython)
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import ubelt as ub
>>> import numpy as np
>>> # By default we don't recurse into ndarrays because they
>>> # Are registered as an indexable class
>>> data = {2: np.array([1, 2, 3])}
>>> walker = ub.IndexableWalker(data)
>>> num_iters = 0
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters += 1
>>> assert num_iters == 1
>>> # Currently to use top-level ndarrays, you need to extend what the
>>> # list class is. This API may change in the future to be easier
>>> # to work with.
>>> data = np.random.rand(3, 5)
>>> walker = ub.IndexableWalker(data, list_cls=(list, tuple, np.ndarray))
>>> num_iters = 0
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters += 1
>>> assert num_iters == 3 + 3 * 5
send(arg) send 'arg' into generator,[source]

return next yielded value or raise StopIteration.

throw(typ[, val[, tb]]) raise exception in generator,[source]

return next yielded value or raise StopIteration.

Parameters:
  • typ (Any) – Type of the exception. Should be a type[BaseException], type checking is not working right here.

  • val (Optional[object])

  • tb (Optional[TracebackType])

Returns:

Any

Raises:

StopIteration

References

_walk(data=None, prefix=[])[source]

Defines the underlying generator used by IndexableWalker

Yields:
Tuple[List, Any] | None – path (List) - a “path” through the nested data structure

value (Any) - the value indexed by that “path”.

Can also yield None in the case that send is called on the generator.

allclose(other, rel_tol=1e-09, abs_tol=0.0, equal_nan=False, return_info=False)[source]

Walks through this and another nested data structures and checks if everything is roughly the same.

Parameters:
  • other (IndexableWalker | List | Dict) – a nested indexable item to compare against.

  • rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values

  • abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values

  • equal_nan (bool) – if True, numpy must be available, and consider nans as equal.

  • return_info (bool, default=False) – if true, return extra info dict

Returns:

A boolean result if return_info is false, otherwise a tuple of the boolean result and an “info” dict containing detailed results indicating what matched and what did not.

Return type:

bool | Tuple[bool, Dict]

Example

>>> import ubelt as ub
>>> items1 = ub.IndexableWalker({
>>>     'foo': [1.222222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> items2 = ub.IndexableWalker({
>>>     'foo': [1.22222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> flag, return_info =  items1.allclose(items2, return_info=True)
>>> print('return_info = {}'.format(ub.repr2(return_info, nl=1)))
>>> print('flag = {!r}'.format(flag))
>>> for p1, v1, v2  in return_info['faillist']:
>>>     v1_ = items1[p1]
>>>     print('*fail p1, v1, v2 = {}, {}, {}'.format(p1, v1, v2))
>>> for p1 in return_info['passlist']:
>>>     v1_ = items1[p1]
>>>     print('*pass p1, v1_ = {}, {}'.format(p1, v1_))
>>> assert not flag
>>> import ubelt as ub
>>> items1 = ub.IndexableWalker({
>>>     'foo': [1.0000000000000000000000001, 1.],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> items2 = ub.IndexableWalker({
>>>     'foo': [0.9999999999999999, 1.],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> flag, return_info =  items1.allclose(items2, return_info=True)
>>> print('return_info = {}'.format(ub.repr2(return_info, nl=1)))
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag, return_info =  ub.IndexableWalker([]).allclose(ub.IndexableWalker([]), return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag =  ub.IndexableWalker([]).allclose([], return_info=False)
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag, return_info =  ub.IndexableWalker([]).allclose([1], return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert not flag

Example

>>> # xdoctest: +REQUIRES(module:numpy)
>>> import ubelt as ub
>>> import numpy as np
>>> a = np.random.rand(3, 5)
>>> b = a + 1
>>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,))
>>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,))
>>> flag, return_info =  wa.allclose(wb, return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert not flag
>>> a = np.random.rand(3, 5)
>>> b = a.copy() + 1e-17
>>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list))
>>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list))
>>> flag, return_info =  wa.allclose(wb, return_info=True)
>>> assert flag
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
diff(other, rel_tol=1e-09, abs_tol=0.0, equal_nan=False)[source]

Walks through two nested data structures finds differences in the structures.

Parameters:
  • other (IndexableWalker | List | Dict) – a nested indexable item to compare against.

  • rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values

  • abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values

  • equal_nan (bool) – if True, numpy must be available, and consider nans as equal.

Returns:

information about the diff with

”similarity”: a score between 0 and 1 “num_differences” being the number of paths not common plus the

number of common paths with differing values.

”unique1”: being the paths that were unique to self “unique2”: being the paths that were unique to other “faillist”: a list 3-tuples of common path and differing values “num_approximations”:

is the number of approximately equal items (i.e. floats) there were

Return type:

dict

Example

>>> import ubelt as ub
>>> dct1 = {
>>>     'foo': [1.222222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>>     'top': [1, 2, 3],
>>>     'L0': {'L1': {'L2': {'K1': 'V1', 'K2': 'V2', 'D1': 1, 'D2': 2}}},
>>> }
>>> dct2 = {
>>>     'foo': [1.22222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>>     'buz': {1: 2},
>>>     'top': [1, 1, 2],
>>>     'L0': {'L1': {'L2': {'K1': 'V1', 'K2': 'V2', 'D1': 10, 'D2': 20}}},
>>> }
>>> info = ub.IndexableWalker(dct1).diff(dct2)
>>> print(f'info = {ub.urepr(info, nl=2)}')

Example

>>> # xdoctest: +REQUIRES(module:numpy)
>>> import ubelt as ub
>>> import numpy as np
>>> a = np.random.rand(3, 5)
>>> b = a + 1
>>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,))
>>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,))
>>> info =  wa.diff(wb)
>>> print(f'info = {ub.urepr(info, nl=2)}')
>>> a = np.random.rand(3, 5)
>>> b = a.copy() + 1e-17
>>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list))
>>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list))
>>> info =  wa.diff(wb)
>>> print(f'info = {ub.urepr(info, nl=2)}')
_abc_impl = <_abc._abc_data object>
class ubelt.JobPool(mode='thread', max_workers=0, transient=False)[source]

Bases: object

Abstracts away boilerplate of submitting and collecting jobs

This is a basic wrapper around ubelt.util_futures.Executor that simplifies the most basic case by 1. keeping track of references to submitted futures for you and 2. providing an as_completed method to consume those futures as they are ready.

Variables:
  • executor (Executor) – internal executor object

  • jobs (List[Future]) – internal job list. Note: do not rely on this attribute, it may change in the future.

Example

>>> import ubelt as ub
>>> def worker(data):
>>>     return data + 1
>>> pool = ub.JobPool('thread', max_workers=16)
>>> for data in ub.ProgIter(range(10), desc='submit jobs'):
>>>     pool.submit(worker, data)
>>> final = []
>>> for job in pool.as_completed(desc='collect jobs'):
>>>     info = job.result()
>>>     final.append(info)
>>> print('final = {!r}'.format(final))
Parameters:
  • mode (str) – The backend parallelism mechanism. Can be either thread, serial, or process. Defaults to ‘thread’.

  • max_workers (int) – number of workers. If 0, serial is forced. Defaults to 0.

  • transient (bool) – if True, references to jobs will be discarded as they are returned by as_completed(). Otherwise the jobs attribute holds a reference to all jobs ever submitted. Default to False.

submit(func, *args, **kwargs)[source]

Submit a job managed by the pool

Parameters:
  • func (Callable[…, Any]) – A callable that will take as many arguments as there are passed iterables.

  • *args – positional arguments to pass to the function

  • *kwargs – keyword arguments to pass to the function

Returns:

a future representing the job

Return type:

concurrent.futures.Future

shutdown()[source]
_clear_completed()[source]
as_completed(timeout=None, desc=None, progkw=None)[source]

Generates completed jobs in an arbitrary order

Parameters:
  • timeout (float | None) – Specify the the maximum number of seconds to wait for a job. Note: this is ignored in serial mode.

  • desc (str | None) – if specified, reports progress with a ubelt.progiter.ProgIter object.

  • progkw (dict | None) – extra keyword arguments to ubelt.progiter.ProgIter.

Yields:

concurrent.futures.Future – The completed future object containing the results of a job.

CommandLine

xdoctest -m ubelt.util_futures JobPool.as_completed

Example

>>> import ubelt as ub
>>> pool = ub.JobPool('thread', max_workers=8)
>>> text = ub.paragraph(
...     '''
...     UDP is a cool protocol, check out the wiki:
...
...     UDP-based Data Transfer Protocol (UDT), is a high-performance
...     data transfer protocol designed for transferring large
...     volumetric datasets over high-speed wide area networks. Such
...     settings are typically disadvantageous for the more common TCP
...     protocol.
...     ''')
>>> for word in text.split(' '):
...     pool.submit(print, word)
>>> for _ in pool.as_completed():
...     pass
>>> pool.shutdown()
join(**kwargs)[source]

Like JobPool.as_completed(), but executes the result method of each future and returns only after all processes are complete. This allows for lower-boilerplate prototyping.

Parameters:

**kwargs – passed to JobPool.as_completed()

Returns:

list of results

Return type:

List[Any]

Example

>>> import ubelt as ub
>>> # We just want to try replacing our simple iterative algorithm
>>> # with the embarrassingly parallel version
>>> arglist = list(zip(range(1000), range(1000)))
>>> func = ub.identity
>>> #
>>> # Original version
>>> for args in arglist:
>>>     func(*args)
>>> #
>>> # Potentially parallel version
>>> jobs = ub.JobPool(max_workers=0)
>>> for args in arglist:
>>>     jobs.submit(func, *args)
>>> _ = jobs.join(desc='running')
class ubelt.NiceRepr[source]

Bases: object

Inherit from this class and define __nice__ to “nicely” print your objects.

Defines __str__ and __repr__ in terms of __nice__ function Classes that inherit from NiceRepr should redefine __nice__. If the inheriting class has a __len__, method then the default __nice__ method will return its length.

Example

>>> import ubelt as ub
>>> class Foo(ub.NiceRepr):
...    def __nice__(self):
...        return 'info'
>>> foo = Foo()
>>> assert str(foo) == '<Foo(info)>'
>>> assert repr(foo).startswith('<Foo(info) at ')

Example

>>> import ubelt as ub
>>> class Bar(ub.NiceRepr):
...    pass
>>> bar = Bar()
>>> import pytest
>>> with pytest.warns(RuntimeWarning) as record:
>>>     assert 'object at' in str(bar)
>>>     assert 'object at' in repr(bar)

Example

>>> import ubelt as ub
>>> class Baz(ub.NiceRepr):
...    def __len__(self):
...        return 5
>>> baz = Baz()
>>> assert str(baz) == '<Baz(5)>'

Example

>>> import ubelt as ub
>>> # If your nice message has a bug, it shouldn't bring down the house
>>> class Foo(ub.NiceRepr):
...    def __nice__(self):
...        assert False
>>> foo = Foo()
>>> import pytest
>>> with pytest.warns(RuntimeWarning) as record:
>>>     print('foo = {!r}'.format(foo))
foo = <...Foo ...>

Example

>>> import ubelt as ub
>>> class Animal(ub.NiceRepr):
...    def __init__(self):
...        ...
...    def __nice__(self):
...        return ''
>>> class Cat(Animal):
>>>     ...
>>> class Dog(Animal):
>>>     ...
>>> class Beagle(Dog):
>>>     ...
>>> class Ragdoll(Cat):
>>>     ...
>>> instances = [Animal(), Cat(), Dog(), Beagle(), Ragdoll()]
>>> for inst in instances:
>>>     print(str(inst))
<Animal()>
<Cat()>
<Dog()>
<Beagle()>
<Ragdoll()>
class ubelt.OrderedSet(iterable=None)[source]

Bases: MutableSet, Sequence

An OrderedSet is a custom MutableSet that remembers its order, so that every entry has an index that can be looked up.

Variables:
  • items (List[Any]) – internal ordered representation.

  • map (Dict[Any, int]) – internal mapping from items to indices.

Example

>>> OrderedSet([1, 1, 2, 3, 2])
OrderedSet([1, 2, 3])
Parameters:

iterable (None | Iterable) – input data

copy()[source]

Return a shallow copy of this object.

Returns:

OrderedSet

Example

>>> this = OrderedSet([1, 2, 3])
>>> other = this.copy()
>>> this == other
True
>>> this is other
False
add(key)[source]

Add key as an item to this OrderedSet, then return its index.

If key is already in the OrderedSet, return the index it already had.

Parameters:

key (Any) – the item to add

Returns:

the index of the items. Note, violates the Liskov Substitution Principle and might be changed.

Return type:

int

Example

>>> oset = OrderedSet()
>>> oset.append(3)
0
>>> print(oset)
OrderedSet([3])
append(key)

Add key as an item to this OrderedSet, then return its index.

If key is already in the OrderedSet, return the index it already had.

Parameters:

key (Any) – the item to add

Returns:

the index of the items. Note, violates the Liskov Substitution Principle and might be changed.

Return type:

int

Example

>>> oset = OrderedSet()
>>> oset.append(3)
0
>>> print(oset)
OrderedSet([3])
update(sequence)[source]

Update the set with the given iterable sequence, then return the index of the last element inserted.

Parameters:

sequence (Iterable) – items to add to this set

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.update([3, 1, 5, 1, 4])
4
>>> print(oset)
OrderedSet([1, 2, 3, 5, 4])
index(key, start=0, stop=None)[source]

Get the index of a given entry, raising an IndexError if it’s not present.

key can be a non-string iterable of entries, in which case this returns a list of indices.

Parameters:
  • key (Any) – item to find the position of

  • start (int) – not supported yet

  • stop (int | None) – not supported yet

Returns:

int

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.index(2)
1
get_loc(key, start=0, stop=None)

Get the index of a given entry, raising an IndexError if it’s not present.

key can be a non-string iterable of entries, in which case this returns a list of indices.

Parameters:
  • key (Any) – item to find the position of

  • start (int) – not supported yet

  • stop (int | None) – not supported yet

Returns:

int

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.index(2)
1
get_indexer(key, start=0, stop=None)

Get the index of a given entry, raising an IndexError if it’s not present.

key can be a non-string iterable of entries, in which case this returns a list of indices.

Parameters:
  • key (Any) – item to find the position of

  • start (int) – not supported yet

  • stop (int | None) – not supported yet

Returns:

int

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.index(2)
1
pop()[source]

Remove and return the last element from the set.

Raises KeyError if the set is empty.

Returns:

Any

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.pop()
3
discard(key)[source]

Remove an element. Do not raise an exception if absent.

The MutableSet mixin uses this to implement the .remove() method, which does raise an error when asked to remove a non-existent item.

Parameters:

key (Any) – item to remove.

Example

>>> oset = OrderedSet([1, 2, 3])
>>> oset.discard(2)
>>> print(oset)
OrderedSet([1, 3])
>>> oset.discard(2)
>>> print(oset)
OrderedSet([1, 3])
clear()[source]

Remove all items from this OrderedSet.

union(*sets)[source]

Combines all unique items. Each items order is defined by its first appearance.

Parameters:

*sets – zero or more other iterables to operate on

Returns:

OrderedSet

Example

>>> oset = OrderedSet.union(OrderedSet([3, 1, 4, 1, 5]), [1, 3], [2, 0])
>>> print(oset)
OrderedSet([3, 1, 4, 5, 2, 0])
>>> oset.union([8, 9])
OrderedSet([3, 1, 4, 5, 2, 0, 8, 9])
>>> oset | {10}
OrderedSet([3, 1, 4, 5, 2, 0, 10])
intersection(*sets)[source]

Returns elements in common between all sets. Order is defined only by the first set.

Parameters:

*sets – zero or more other iterables to operate on

Returns:

OrderedSet

Example

>>> from ubelt.orderedset import *  # NOQA
>>> oset = OrderedSet.intersection(OrderedSet([0, 1, 2, 3]), [1, 2, 3])
>>> print(oset)
OrderedSet([1, 2, 3])
>>> oset.intersection([2, 4, 5], [1, 2, 3, 4])
OrderedSet([2])
>>> oset.intersection()
OrderedSet([1, 2, 3])
difference(*sets)[source]

Returns all elements that are in this set but not the others.

Parameters:

*sets – zero or more other iterables to operate on

Returns:

OrderedSet

Example

>>> OrderedSet([1, 2, 3]).difference(OrderedSet([2]))
OrderedSet([1, 3])
>>> OrderedSet([1, 2, 3]).difference(OrderedSet([2]), OrderedSet([3]))
OrderedSet([1])
>>> OrderedSet([1, 2, 3]) - OrderedSet([2])
OrderedSet([1, 3])
>>> OrderedSet([1, 2, 3]).difference()
OrderedSet([1, 2, 3])
issubset(other)[source]

Report whether another set contains this set.

Parameters:

other (Iterable) – check if items in other are all contained in self.

Returns:

bool

Example

>>> OrderedSet([1, 2, 3]).issubset({1, 2})
False
>>> OrderedSet([1, 2, 3]).issubset({1, 2, 3, 4})
True
>>> OrderedSet([1, 2, 3]).issubset({1, 4, 3, 5})
False
issuperset(other)[source]

Report whether this set contains another set.

Parameters:

other (Iterable) – check all items in self are contained in other.

Returns:

bool

Example

>>> OrderedSet([1, 2]).issuperset([1, 2, 3])
False
>>> OrderedSet([1, 2, 3, 4]).issuperset({1, 2, 3})
True
>>> OrderedSet([1, 4, 3, 5]).issuperset({1, 2, 3})
False
symmetric_difference(other)[source]

Return the symmetric difference of two OrderedSets as a new set. That is, the new set will contain all elements that are in exactly one of the sets.

Their order will be preserved, with elements from self preceding elements from other.

Parameters:

other (Iterable) – items to operate on

Returns:

OrderedSet

Example

>>> this = OrderedSet([1, 4, 3, 5, 7])
>>> other = OrderedSet([9, 7, 1, 3, 2])
>>> this.symmetric_difference(other)
OrderedSet([4, 5, 9, 2])
_update_items(items)[source]

Replace the ‘items’ list of this OrderedSet with a new one, updating self.map accordingly.

difference_update(*sets)[source]

Update this OrderedSet to remove items from one or more other sets.

Example

>>> this = OrderedSet([1, 2, 3])
>>> this.difference_update(OrderedSet([2, 4]))
>>> print(this)
OrderedSet([1, 3])
>>> this = OrderedSet([1, 2, 3, 4, 5])
>>> this.difference_update(OrderedSet([2, 4]), OrderedSet([1, 4, 6]))
>>> print(this)
OrderedSet([3, 5])
intersection_update(other)[source]

Update this OrderedSet to keep only items in another set, preserving their order in this set.

Parameters:

other (Iterable) – items to operate on

Example

>>> this = OrderedSet([1, 4, 3, 5, 7])
>>> other = OrderedSet([9, 7, 1, 3, 2])
>>> this.intersection_update(other)
>>> print(this)
OrderedSet([1, 3, 7])
symmetric_difference_update(other)[source]

Update this OrderedSet to remove items from another set, then add items from the other set that were not present in this set.

Parameters:

other (Iterable) – items to operate on

Example

>>> this = OrderedSet([1, 4, 3, 5, 7])
>>> other = OrderedSet([9, 7, 1, 3, 2])
>>> this.symmetric_difference_update(other)
>>> print(this)
OrderedSet([4, 5, 9, 2])
_abc_impl = <_abc._abc_data object>
class ubelt.Path(*args, **kwargs)[source]

Bases: PosixPath

This class extends pathlib.Path with extra functionality and convenience methods.

New methods are designed to support chaining.

In addition to new methods this class supports the addition (+) operator via which allows for better drop-in compatibility with code using existing string-based paths.

Note

On windows this inherits from pathlib.WindowsPath.

New methods are

New classmethods are

Modified methods are

Example

>>> # Ubelt extends pathlib functionality
>>> import ubelt as ub
>>> # Chain expansion and mkdir with cumbersome args.
>>> dpath = ub.Path('~/.cache/ubelt/demo_path').expand().ensuredir()
>>> fpath = dpath / 'text_file.txt'
>>> # Augment is concise and chainable
>>> aug_fpath = fpath.augment(stemsuffix='.aux', ext='.jpg').touch()
>>> aug_dpath = dpath.augment(stemsuffix='demo_path2')
>>> assert aug_fpath.read_text() == ''
>>> fpath.write_text('text data')
>>> assert aug_fpath.exists()
>>> # Delete is akin to "rm -rf" and is also chainable.
>>> assert not aug_fpath.delete().exists()
>>> assert dpath.exists()
>>> assert not dpath.delete().exists()
>>> print(f'{str(fpath.shrinkuser()).replace(os.path.sep, "/")}')
>>> print(f'{str(dpath.shrinkuser()).replace(os.path.sep, "/")}')
>>> print(f'{str(aug_fpath.shrinkuser()).replace(os.path.sep, "/")}')
>>> print(f'{str(aug_dpath.shrinkuser()).replace(os.path.sep, "/")}')
~/.cache/ubelt/demo_path/text_file.txt
~/.cache/ubelt/demo_path
~/.cache/ubelt/demo_path/text_file.aux.jpg
~/.cache/ubelt/demo_pathdemo_path2

Inherited unmodified properties from pathlib.Path are:

  • pathlib.PurePath.anchor

  • pathlib.PurePath.name

  • pathlib.PurePath.parts

  • pathlib.PurePath.parent

  • pathlib.PurePath.parents

  • pathlib.PurePath.suffix

  • pathlib.PurePath.suffixes

  • pathlib.PurePath.stem

  • pathlib.PurePath.drive

  • pathlib.PurePath.root

Inherited unmodified classmethods from pathlib.Path are:

Inherited unmodified methods from pathlib.Path are:

classmethod appdir(appname=None, *args, type='cache')[source]

Returns a standard platform specific directory for an application to use as cache, config, or data.

The default root location depends on the platform and is specified the the following table:

TextArt

       | POSIX            | Windows        | MacOSX
data   | $XDG_DATA_HOME   | %APPDATA%      | ~/Library/Application Support
config | $XDG_CONFIG_HOME | %APPDATA%      | ~/Library/Application Support
cache  | $XDG_CACHE_HOME  | %LOCALAPPDATA% | ~/Library/Caches


If an environment variable is not specified the defaults are:
    APPDATA      = ~/AppData/Roaming
    LOCALAPPDATA = ~/AppData/Local

    XDG_DATA_HOME   = ~/.local/share
    XDG_CACHE_HOME  = ~/.cache
    XDG_CONFIG_HOME = ~/.config
Parameters:
  • appname (str | None) – The name of the application.

  • *args – optional subdirs

  • type (str) – the type of data the expected to be stored in this application directory. Valid options are ‘cache’, ‘config’, or ‘data’.

Returns:

a new path object for the specified application directory.

Return type:

Path

SeeAlso:

This provides functionality similar to the appdirs - and platformdirs - packages.

Example

>>> # xdoctest: +IGNORE_WANT
>>> import ubelt as ub
>>> print(ub.Path.appdir('ubelt', type='cache').shrinkuser())
>>> print(ub.Path.appdir('ubelt', type='config').shrinkuser())
>>> print(ub.Path.appdir('ubelt', type='data').shrinkuser())
~/.cache/ubelt
~/.config/ubelt
~/.local/share/ubelt
>>> import pytest
>>> with pytest.raises(KeyError):
>>>     ub.Path.appdir('ubelt', type='other')

Example

>>> # xdoctest: +IGNORE_WANT
>>> import ubelt as ub
>>> # Can now call appdir without any arguments
>>> print(ub.Path.appdir().shrinkuser())
~/.cache
augment(prefix='', stemsuffix='', ext=None, stem=None, dpath=None, tail='', relative=None, multidot=False, suffix='')[source]

Create a new path with a different extension, basename, directory, prefix, and/or suffix.

See augpath() for more details.

Parameters:
  • prefix (str) – Text placed in front of the stem. Defaults to ‘’.

  • stemsuffix (str) – Text placed between the stem and extension. Defaults to ‘’.

  • ext (str | None) – If specified, replaces the extension

  • stem (str | None) – If specified, replaces the stem (i.e. basename without extension).

  • dpath (str | PathLike | None) – If specified, replaces the specified “relative” directory, which by default is the parent directory.

  • tail (str | None) – If specified, appends this text the very end of the path - after the extension.

  • relative (str | PathLike | None) – Replaces relative with dpath in path. Has no effect if dpath is not specified. Defaults to the dirname of the input path. experimental not currently implemented.

  • multidot (bool) – Allows extensions to contain multiple dots. Specifically, if False, everything after the last dot in the basename is the extension. If True, everything after the first dot in the basename is the extension.

SeeAlso:

pathlib.Path.with_stem() pathlib.Path.with_name() pathlib.Path.with_suffix()

Returns:

augmented path

Return type:

Path

Warning

NOTICE OF BACKWARDS INCOMPATABILITY.

THE INITIAL RELEASE OF Path.augment suffered from an unfortunate variable naming decision that conflicts with pathlib.Path

p = ub.Path('the.entire.fname.or.dname.is.the.name.exe')
print(f'p     ={p}')
print(f'p.name={p.name}')
p = ub.Path('the.stem.ends.here.ext')
print(f'p     ={p}')
print(f'p.stem={p.stem}')
p = ub.Path('only.the.last.dot.is.the.suffix')
print(f'p       ={p}')
print(f'p.suffix={p.suffix}')
p = ub.Path('but.all.suffixes.can.be.recovered')
print(f'p         ={p}')
print(f'p.suffixes={p.suffixes}')

Example

>>> import ubelt as ub
>>> path = ub.Path('foo.bar')
>>> suffix = '_suff'
>>> prefix = 'pref_'
>>> ext = '.baz'
>>> newpath = path.augment(prefix=prefix, stemsuffix=suffix, ext=ext, stem='bar')
>>> print('newpath = {!r}'.format(newpath))
newpath = Path('pref_bar_suff.baz')

Example

>>> import ubelt as ub
>>> path = ub.Path('foo.bar')
>>> stemsuffix = '_suff'
>>> prefix = 'pref_'
>>> ext = '.baz'
>>> newpath = path.augment(prefix=prefix, stemsuffix=stemsuffix, ext=ext, stem='bar')
>>> print('newpath = {!r}'.format(newpath))

Example

>>> # Compare our augpath(ext=...) versus pathlib with_suffix(...)
>>> import ubelt as ub
>>> cases = [
>>>     ub.Path('no_ext'),
>>>     ub.Path('one.ext'),
>>>     ub.Path('double..dot'),
>>>     ub.Path('two.many.cooks'),
>>>     ub.Path('path.with.three.dots'),
>>>     ub.Path('traildot.'),
>>>     ub.Path('doubletraildot..'),
>>>     ub.Path('.prefdot'),
>>>     ub.Path('..doubleprefdot'),
>>> ]
>>> for path in cases:
>>>     print('--')
>>>     print('path = {}'.format(ub.repr2(path, nl=1)))
>>>     ext = '.EXT'
>>>     method_pathlib = path.with_suffix(ext)
>>>     method_augment = path.augment(ext=ext)
>>>     if method_pathlib == method_augment:
>>>         print(ub.color_text('sagree', 'green'))
>>>     else:
>>>         print(ub.color_text('disagree', 'red'))
>>>     print('path.with_suffix({}) = {}'.format(ext, ub.repr2(method_pathlib, nl=1)))
>>>     print('path.augment(ext={}) = {}'.format(ext, ub.repr2(method_augment, nl=1)))
>>>     print('--')
delete()[source]

Removes a file or recursively removes a directory. If a path does not exist, then this is does nothing.

SeeAlso:

ubelt.delete()

Returns:

reference to self

Return type:

Path

Example

>>> import ubelt as ub
>>> from os.path import join
>>> base = ub.Path.appdir('ubelt', 'delete_test2')
>>> dpath1 = (base / 'dir').ensuredir()
>>> (base / 'dir' / 'subdir').ensuredir()
>>> (base / 'dir' / 'to_remove1.txt').touch()
>>> fpath1 = (base / 'dir' / 'subdir' / 'to_remove3.txt').touch()
>>> fpath2 = (base / 'dir' / 'subdir' / 'to_remove2.txt').touch()
>>> assert all(p.exists() for p in [dpath1, fpath1, fpath2])
>>> fpath1.delete()
>>> assert all(p.exists() for p in [dpath1, fpath2])
>>> assert not fpath1.exists()
>>> dpath1.delete()
>>> assert not any(p.exists() for p in [dpath1, fpath1, fpath2])
ensuredir(mode=511)[source]

Concise alias of self.mkdir(parents=True, exist_ok=True)

Parameters:

mode (int) – octal permissions if a new directory is created. Defaults to 0o777.

Returns:

returns itself

Return type:

Path

Example

>>> import ubelt as ub
>>> cache_dpath = ub.Path.appdir('ubelt').ensuredir()
>>> dpath = ub.Path(cache_dpath, 'newdir')
>>> dpath.delete()
>>> assert not dpath.exists()
>>> dpath.ensuredir()
>>> assert dpath.exists()
>>> dpath.rmdir()
mkdir(mode=511, parents=False, exist_ok=False)[source]

Create a new directory at this given path.

Note

The ubelt extension is the same as the original pathlib method, except this returns returns the path instead of None.

Parameters:
  • mode (int) – permission bits

  • parents (bool) – create parents

  • exist_ok (bool) – fail if exists

Returns:

returns itself

Return type:

Path

expand()[source]

Expands user tilde and environment variables.

Concise alias of Path(os.path.expandvars(self.expanduser()))

Returns:

path with expanded environment variables and tildes

Return type:

Path

Example

>>> import ubelt as ub
>>> home_v1 = ub.Path('~/').expand()
>>> home_v2 = ub.Path.home()
>>> print('home_v1 = {!r}'.format(home_v1))
>>> print('home_v2 = {!r}'.format(home_v2))
>>> assert home_v1 == home_v2
expandvars()[source]

As discussed in [CPythonIssue21301], CPython won’t be adding expandvars to pathlib. I think this is a mistake, so I added it in this extension.

Returns:

path with expanded environment variables

Return type:

Path

References

ls(pattern=None)[source]

A convenience function to list all paths in a directory.

This is a wrapper around iterdir that returns the results as a list instead of a generator. This is mainly for faster navigation in IPython. In production code iterdir or glob should be used instead.

Parameters:

pattern (None | str) – if specified, performs a glob instead of an iterdir.

Returns:

an eagerly evaluated list of paths

Return type:

List[‘Path’]

Note

When pattern is specified only paths matching the pattern are returned, not the paths inside matched directories. This is different than bash semantics where the pattern is first expanded and then ls is performed on all matching paths.

Example

>>> import ubelt as ub
>>> self = ub.Path.appdir('ubelt/tests/ls')
>>> (self / 'dir1').ensuredir()
>>> (self / 'dir2').ensuredir()
>>> (self / 'file1').touch()
>>> (self / 'file2').touch()
>>> (self / 'dir1/file3').touch()
>>> (self / 'dir2/file4').touch()
>>> children = self.ls()
>>> assert isinstance(children, list)
>>> print(ub.repr2(sorted([p.relative_to(self) for p in children])))
[
    Path('dir1'),
    Path('dir2'),
    Path('file1'),
    Path('file2'),
]
>>> children = self.ls('dir*/*')
>>> assert isinstance(children, list)
>>> print(ub.repr2(sorted([p.relative_to(self) for p in children])))
[
    Path('dir1/file3'),
    Path('dir2/file4'),
]
shrinkuser(home='~')[source]

Shrinks your home directory by replacing it with a tilde.

This is the inverse of os.path.expanduser().

Parameters:

home (str) – symbol used to replace the home path. Defaults to ‘~’, but you might want to use ‘$HOME’ or ‘%USERPROFILE%’ instead.

Returns:

shortened path replacing the home directory with a symbol

Return type:

Path

Example

>>> import ubelt as ub
>>> path = ub.Path('~').expand()
>>> assert str(path.shrinkuser()) == '~'
>>> assert str(ub.Path((str(path) + '1')).shrinkuser()) == str(path) + '1'
>>> assert str((path / '1').shrinkuser()) == join('~', '1')
>>> assert str((path / '1').shrinkuser('$HOME')) == join('$HOME', '1')
>>> assert str(ub.Path('.').shrinkuser()) == '.'
chmod(mode, follow_symlinks=True)[source]

Change the permissions of the path, like os.chmod().

Parameters:
  • mode (int | str) – either a stat code to pass directly to os.chmod() or a string-based code to construct modified permissions. See note for details on the string-based chmod codes.

  • follow_symlinks (bool) – if True, and this path is a symlink, modify permission of the file it points to, otherwise if False, modify the link permission.

Note

From the chmod man page:

The format of a symbolic mode is [ugoa…][[-+=][perms…]…], where perms is either zero or more letters from the set rwxXst, or a single letter from the set ugo. Multiple symbolic modes can be given, separated by commas.

Note

Like os.chmod(), this may not work on Windows or on certain filesystems.

Returns:

returns self for chaining

Return type:

Path

Example

>>> # xdoctest: +REQUIRES(POSIX)
>>> import ubelt as ub
>>> from ubelt.util_path import _encode_chmod_int
>>> dpath = ub.Path.appdir('ubelt/tests/chmod').ensuredir()
>>> fpath = (dpath / 'file.txt').touch()
>>> fpath.chmod('ugo+rw,ugo-x')
>>> print(_encode_chmod_int(fpath.stat().st_mode))
u=rw,g=rw,o=rw
>>> fpath.chmod('o-rwx')
>>> print(_encode_chmod_int(fpath.stat().st_mode))
u=rw,g=rw
>>> fpath.chmod(0o646)
>>> print(_encode_chmod_int(fpath.stat().st_mode))
u=rw,g=r,o=rw
touch(mode=438, exist_ok=True)[source]

Create this file with the given access mode, if it doesn’t exist.

Returns:

returns itself

Return type:

Path

Note

The ubelt.util_io.touch() function currently has a slightly different implementation. This uses whatever the pathlib version is. This may change in the future.

walk(topdown=True, onerror=None, followlinks=False)[source]

A variant of os.walk() for pathlib

Parameters:
  • topdown (bool) – if True starts yield nodes closer to the root first otherwise yield nodes closer to the leaves first.

  • onerror (Callable[[OSError], None] | None) – A function with one argument of type OSError. If the error is raised the walk is aborted, otherwise it continues.

  • followlinks (bool) – if True recurse into symbolic directory links

Yields:

Tuple[‘Path’, List[str], List[str]] – the root path, directory names, and file names

Example

>>> import ubelt as ub
>>> self = ub.Path.appdir('ubelt/tests/ls')
>>> (self / 'dir1').ensuredir()
>>> (self / 'dir2').ensuredir()
>>> (self / 'file1').touch()
>>> (self / 'file2').touch()
>>> (self / 'dir1/file3').touch()
>>> (self / 'dir2/file4').touch()
>>> subdirs = list(self.walk())
>>> assert len(subdirs) == 3

Example

>>> # Modified from the stdlib
>>> import os
>>> from os.path import join, getsize
>>> import email
>>> import ubelt as ub
>>> base = ub.Path(email.__file__).parent
>>> for root, dirs, files in base.walk():
>>>     print(root, " consumes", end="")
>>>     print(sum(getsize(join(root, name)) for name in files), end="")
>>>     print("bytes in ", len(files), " non-directory files")
>>>     if 'CVS' in dirs:
>>>         dirs.remove('CVS')  # don't visit CVS directories
endswith(suffix, *args)[source]

Test if the fspath representation ends with suffix.

Allows ubelt.Path to be a better drop-in replacement when working with string-based paths.

Parameters:
  • suffix (str | Tuple[str, …]) – One or more suffixes to test for

  • *args – start (int): if specified begin testing at this position. end (int): if specified stop testing at this position.

Returns:

True if any of the suffixes match.

Return type:

bool

Example

>>> import ubelt as ub
>>> base = ub.Path('base')
>>> assert base.endswith('se')
>>> assert not base.endswith('be')
>>> # test start / stop cases
>>> assert ub.Path('aabbccdd').endswith('cdd', 5)
>>> assert not ub.Path('aabbccdd').endswith('cdd', 6)
>>> assert ub.Path('aabbccdd').endswith('cdd', 5, 10)
>>> assert not ub.Path('aabbccdd').endswith('cdd', 5, 7)
>>> # test tuple case
>>> assert ub.Path('aabbccdd').endswith(('foo', 'cdd'))
>>> assert ub.Path('foo').endswith(('foo', 'cdd'))
>>> assert not ub.Path('bar').endswith(('foo', 'cdd'))
startswith(prefix, *args)[source]

Test if the fspath representation starts with prefix.

Allows ubelt.Path to be a better drop-in replacement when working with string-based paths.

Parameters:
  • prefix (str | Tuple[str, …]) – One or more prefixes to test for

  • *args – start (int): if specified begin testing at this position. end (int): if specified stop testing at this position.

Returns:

True if any of the prefixes match.

Return type:

bool

Example

>>> import ubelt as ub
>>> base = ub.Path('base')
>>> assert base.startswith('base')
>>> assert not base.startswith('all your')
>>> # test start / stop cases
>>> assert ub.Path('aabbccdd').startswith('aab', 0)
>>> assert ub.Path('aabbccdd').startswith('aab', 0, 5)
>>> assert not ub.Path('aabbccdd').startswith('aab', 1, 5)
>>> assert not ub.Path('aabbccdd').startswith('aab', 0, 2)
>>> # test tuple case
>>> assert ub.Path('aabbccdd').startswith(('foo', 'aab'))
>>> assert ub.Path('foo').startswith(('foo', 'aab'))
>>> assert not ub.Path('bar').startswith(('foo', 'aab'))
_request_copy_function(follow_file_symlinks=True, follow_dir_symlinks=True, meta='stats')[source]

Get a copy_function based on specified capabilities

copy(dst, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats', overwrite=False)[source]

Copy this file or directory to dst.

By default files are never overwritten and symlinks are copied as-is.

At a basic level (i.e. ignoring symlinks) for each path argument (src and dst) these can either be files, directories, or not exist. Given these three states, the following table summarizes how this function copies this path to its destination.

TextArt

+----------+------------------------+------------------------+----------+
| dst      | dir                    | file                   | no-exist |
+----------+                        |                        |          |
| src      |                        |                        |          |
+==========+========================+========================+==========+
| dir      | error-or-overwrite-dst | error                  | dst      |
+----------+------------------------+------------------------+----------+
| file     | dst / src.name         | error-or-overwrite-dst | dst      |
+----------+------------------------+------------------------+----------+
| no-exist | error                  | error                  | error    |
+----------+------------------------+------------------------+----------+

In general, the contents of src will be the contents of dst, except for the one case where a file is copied into an existing directory. In this case the name is used to construct a fully qualified destination.

Parameters:
  • dst (str | PathLike) – if src is a file and dst does not exist, copies this to dst if src is a file and dst is a directory, copies this to dst / src.name

    if src is a directory and dst does not exist, copies this to dst if src is a directory and dst is a directory, errors unless overwrite is True, in which case, copies this to dst and overwrites anything conflicting path.

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2()), ‘mode’ which copies just the permission bits (i.e. like shutil.copy()), or None, which ignores all metadata (i.e. like shutil.copyfile()).

  • overwrite (bool) – if False, and target file exists, this will raise an error, otherwise the file will be overwritten.

Returns:

where the path was copied to

Return type:

Path

Note

This is implemented with a combination of shutil.copy(), shutil.copy2(), and shutil.copytree(), but the defaults and behavior here are different (and ideally safer and more intuitive).

Note

Unlike cp on Linux, copying a src directory into a dst directory will not implicitly add the src directory name to the dst directory. This means we cannot copy directory <parent>/<dname> to <dst> and expect the result to be <dst>/<dname>.

Conceptually you can expect <parent>/<dname>/<contents> to exist in <dst>/<contents>.

Example

>>> import ubelt as ub
>>> root = ub.Path.appdir('ubelt', 'tests', 'path', 'copy').delete().ensuredir()
>>> paths = {}
>>> dpath = (root / 'orig').ensuredir()
>>> clone0 = (root / 'dst_is_explicit').ensuredir()
>>> clone1 = (root / 'dst_is_parent').ensuredir()
>>> paths['fpath'] = (dpath / 'file0.txt').touch()
>>> paths['empty_dpath'] = (dpath / 'empty_dpath').ensuredir()
>>> paths['nested_dpath'] = (dpath / 'nested_dpath').ensuredir()
>>> (dpath / 'nested_dpath/d0').ensuredir()
>>> (dpath / 'nested_dpath/d0/f1.txt').touch()
>>> (dpath / 'nested_dpath/d0/f2.txt').touch()
>>> print('paths = {}'.format(ub.repr2(paths, nl=1)))
>>> assert all(p.exists() for p in paths.values())
>>> paths['fpath'].copy(clone0 / 'file0.txt')
>>> paths['fpath'].copy(clone1)
>>> paths['empty_dpath'].copy(clone0 / 'empty_dpath')
>>> paths['empty_dpath'].copy((clone1 / 'empty_dpath_alt').ensuredir(), overwrite=True)
>>> paths['nested_dpath'].copy(clone0 / 'nested_dpath')
>>> paths['nested_dpath'].copy((clone1 / 'nested_dpath_alt').ensuredir(), overwrite=True)
move(dst, follow_file_symlinks=False, follow_dir_symlinks=False, meta='stats')[source]

Move a file from one location to another, or recursively move a directory from one location to another.

This method will refuse to overwrite anything, and there is currently no overwrite option for technical reasons. This may change in the future.

Parameters:
  • dst (str | PathLike) – A non-existing path where this file will be moved.

  • follow_file_symlinks (bool) – If True and src is a link, the link will be resolved before it is copied (i.e. the data is duplicated), otherwise just the link itself will be copied.

  • follow_dir_symlinks (bool) – if True when src is a directory and contains symlinks to other directories, the contents of the linked data are copied, otherwise when False only the link itself is copied.

  • meta (str | None) – Indicates what metadata bits to copy. This can be ‘stats’ which tries to copy all metadata (i.e. like shutil.copy2), ‘mode’ which copies just the permission bits (i.e. like shutil.copy), or None, which ignores all metadata (i.e. like shutil.copyfile).

Note

This method will refuse to overwrite anything.

This is implemented via shutil.move(), which depends heavily on os.rename() semantics. For this reason, this function will error if it would overwrite any data. If you want an overwriting variant of move we recommend you either either copy the data, and then delete the original (potentially inefficient), or use shutil.move() directly if you know how os.rename() works on your system.

Returns:

where the path was moved to

Return type:

Path

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt', 'tests', 'path', 'move').delete().ensuredir()
>>> paths = {}
>>> paths['dpath0'] = (dpath / 'dpath0').ensuredir()
>>> paths['dpath00'] = (dpath / 'dpath0' / 'sub0').ensuredir()
>>> paths['fpath000'] = (dpath / 'dpath0' / 'sub0' / 'f0.txt').touch()
>>> paths['fpath001'] = (dpath / 'dpath0' / 'sub0' / 'f1.txt').touch()
>>> paths['dpath01'] = (dpath / 'dpath0' / 'sub1').ensuredir()
>>> print('paths = {}'.format(ub.repr2(paths, nl=1)))
>>> assert all(p.exists() for p in paths.values())
>>> paths['dpath0'].move(dpath / 'dpath1')
class ubelt.ProgIter(iterable=None, desc=None, total=None, freq=1, initial=0, eta_window=64, clearline=True, adjust=True, time_thresh=2.0, show_percent=True, show_times=True, show_rate=True, show_eta=True, show_total=True, show_wall=False, enabled=True, verbose=None, stream=None, chunksize=None, rel_adjust_limit=4.0, homogeneous='auto', timer=None, **kwargs)[source]

Bases: _TQDMCompat, _BackwardsCompat

Prints progress as an iterator progresses

ProgIter is an alternative to tqdm. ProgIter implements much of the tqdm-API. The main difference between ProgIter and tqdm is that ProgIter does not use threading whereas tqdm does.

Attributes:

Note

Either use ProgIter in a with statement or call prog.end() at the end of the computation if there is a possibility that the entire iterable may not be exhausted.

Note

ProgIter is an alternative to tqdm. The main difference between ProgIter and tqdm is that ProgIter does not use threading whereas tqdm does. ProgIter is simpler than tqdm and thus more stable in certain circumstances.

SeeAlso:

tqdm - https://pypi.python.org/pypi/tqdm

References

Example

>>> 
>>> def is_prime(n):
...     return n >= 2 and not any(n % i == 0 for i in range(2, n))
>>> for n in ProgIter(range(100), verbose=1, show_wall=True):
>>>     # do some work
>>>     is_prime(n)
100/100... rate=... Hz, total=..., wall=...

See attributes more arg information

Parameters:
  • iterable (List | Iterable) – A list or iterable to loop over

  • desc (str | None) – description label to show with progress

  • total (int | None) – Maximum length of the process. If not specified, we estimate it from the iterable, if possible.

  • freq (int) – How many iterations to wait between messages. Defaults to 1.

  • initial (int) – starting index offset, default=0

  • eta_window (int) – number of previous measurements to use in eta calculation, default=64

  • clearline (bool) – if True messages are printed on the same line otherwise each new progress message is printed on new line. default=True

  • adjust (bool) – if True freq is adjusted based on time_thresh. This may be overwritten depending on the setting of verbose. default=True

  • time_thresh (float) – desired amount of time to wait between messages if adjust is True otherwise does nothing, default=2.0

  • show_percent (bool) – if True show percent progress. Default=True

  • show_times (bool) – if False do not show rate, eta, or wall time. default=True Deprecated. Use show_rate / show_eta / show_wall instead.

  • show_rate (bool) – show / hide rate, default=True

  • show_eta (bool) – show / hide estimated time of arival (i.e. time to completion), default=True

  • show_wall (bool) – show / hide wall time, default=False

  • stream (typing.IO) – stream where progress information is written to, default=sys.stdout

  • timer (callable) – the timer object to use. Defaults to time.perf_counter().

  • enabled (bool) – if False nothing happens. default=True

  • chunksize (int | None) – indicates that each iteration processes a batch of this size. Iteration rate is displayed in terms of single-items.

  • rel_adjust_limit (float) – Maximum factor update frequency can be adjusted by in a single step. default=4.0

  • verbose (int) – verbosity mode, which controls clearline, adjust, and enabled. The following maps the value of verbose to its effect. 0: enabled=False, 1: enabled=True with clearline=True and adjust=True, 2: enabled=True with clearline=False and adjust=True, 3: enabled=True with clearline=False and adjust=False

  • homogeneous (bool | str) – Indicate if the iterable is likely to take a uniform or homogeneous amount of time per iteration. When True we can enable a speed optimization. When False, the time estimates are more accurate. Default to “auto”, which attempts to determine if it is safe to use True. Has no effect if adjust is False.

  • show_total (bool) – if True show total time.

  • **kwargs – accepts most of the tqdm api

set_extra(extra)[source]

specify a custom info appended to the end of the next message

Parameters:

extra (str | Callable) – a constant or dynamically constructed extra message.

Todo

  • [ ] extra is a bad name; come up with something better and rename

Example

>>> prog = ProgIter(range(100, 300, 100), show_times=False, verbose=3)
>>> for n in prog:
>>>     prog.set_extra('processesing num {}'.format(n))
 0.00% 0/2...
 50.00% 1/2...processesing num 100
 100.00% 2/2...processesing num 200
_reset_internals()[source]

Initialize all variables used in the internal state

begin()[source]

Initializes information used to measure progress

This only needs to be used if this ProgIter is not wrapping an iterable. Does nothing if this ProgIter is disabled.

Returns:

a chainable self-reference

Return type:

ProgIter

end()[source]

Signals that iteration has ended and displays the final message.

This only needs to be used if this ProgIter is not wrapping an iterable. Does nothing if this ProgIter object is disabled or has already finished.

_iterate()[source]

iterates with progress

_homogeneous_check(gen)[source]
_slow_path_step_body(force=False)[source]
step(inc=1, force=False)[source]

Manually step progress update, either directly or by an increment.

Parameters:
  • inc (int) – number of steps to increment. Defaults to 1.

  • force (bool) – if True forces progress display. Defaults to False.

Example

>>> n = 3
>>> prog = ProgIter(desc='manual', total=n, verbose=3)
>>> # Need to manually begin and end in this mode
>>> prog.begin()
>>> for _ in range(n):
...     prog.step()
>>> prog.end()

Example

>>> n = 3
>>> # can be used as a context manager in manual mode
>>> with ProgIter(desc='manual', total=n, verbose=3) as prog:
...     for _ in range(n):
...         prog.step()
_adjust_frequency()[source]
_measure_time()[source]

Measures the current time and update info about how long we’ve been waiting since the last iteration was displayed.

_update_message_template()[source]
_build_message_template()[source]

Defines the template for the progress line

Returns:

Tuple[str, str, str]

Example

>>> self = ProgIter()
>>> print(self._build_message_template()[1].strip())
{desc} {iter_idx:4d}/?...{extra} rate={rate:{rate_format}} Hz, total={total}...
>>> self = ProgIter(show_total=False, show_eta=False, show_rate=False)
>>> print(self._build_message_template()[1].strip())
{desc} {iter_idx:4d}/?...{extra}
>>> self = ProgIter(total=0, show_times=True)
>>> print(self._build_message_template()[1].strip())
{desc} {percent:03.2f}% {iter_idx:1d}/0...{extra} rate={rate:{rate_format}} Hz, total={total}
format_message()[source]

Exists only for backwards compatibility.

See format_message_parts for more recent API.

Returns:

str

format_message_parts()[source]

builds a formatted progress message with the current values. This contains the special characters needed to clear lines.

Returns:

Tuple[str, str, str]

Example

>>> self = ProgIter(clearline=False, show_times=False)
>>> print(repr(self.format_message_parts()[1]))
'    0/?... '
>>> self.begin()
>>> self.step()
>>> print(repr(self.format_message_parts()[1]))
' 1/?... '

Example

>>> self = ProgIter(chunksize=10, total=100, clearline=False,
>>>                 show_times=False, microseconds=True)
>>> # hack, microseconds=True for coverage, needs real test
>>> print(repr(self.format_message_parts()[1]))
' 0.00% of 10x100... '
>>> self.begin()
>>> self.update()  # tqdm alternative to step
>>> print(repr(self.format_message_parts()[1]))
' 1.00% of 10x100... '
ensure_newline()[source]

use before any custom printing when using the progress iter to ensure your print statement starts on a new line instead of at the end of a progress line

Example

>>> # Unsafe version may write your message on the wrong line
>>> prog = ProgIter(range(3), show_times=False, freq=2, adjust=False,
...                 time_thresh=0)
>>> for n in prog:
...     print('unsafe message')
 0.00% 0/3... unsafe message
unsafe message
 66.67% 2/3... unsafe message
 100.00% 3/3...
>>> # apparently the safe version does this too.
>>> print('---')
---
>>> prog = ProgIter(range(3), show_times=False, freq=2, adjust=False,
...                 time_thresh=0)
>>> for n in prog:
...     prog.ensure_newline()
...     print('safe message')
 0.00% 0/3...
safe message
safe message
 66.67% 2/3...
safe message
 100.00% 3/3...
display_message()[source]

Writes current progress to the output stream

_tryflush()[source]

flush to the internal stream

_write(msg)[source]

write to the internal stream

Parameters:

msg (str) – message to write

class ubelt.ReprExtensions[source]

Bases: object

Helper class for managing non-builtin (e.g. numpy) format types.

This module (ubelt.util_repr) maintains a global set of basic extensions, but it is also possible to create a locally scoped set of extensions and explicitly pass it to urepr. The following example demonstrates this.

Example

>>> import ubelt as ub
>>> class MyObject(object):
>>>     pass
>>> data = {'a': [1, 2.2222, MyObject()], 'b': MyObject()}
>>> # Create a custom set of extensions
>>> extensions = ub.ReprExtensions()
>>> # Register a function to format your specific type
>>> @extensions.register(MyObject)
>>> def format_myobject(data, **kwargs):
>>>     return 'I can do anything here'
>>> # Repr2 will now respect the passed custom extensions
>>> # Note that the global extensions will still be respected
>>> # unless they are overloaded.
>>> print(ub.urepr(data, nl=-1, precision=1, extensions=extensions))
{
    'a': [1, 2.2, I can do anything here],
    'b': I can do anything here
}
>>> # Overload the formatter for float and int
>>> @extensions.register((float, int))
>>> def format_myobject(data, **kwargs):
>>>     return str((data + 10) // 2)
>>> print(ub.urepr(data, nl=-1, precision=1, extensions=extensions))
{
    'a': [5, 6.0, I can do anything here],
    'b': I can do anything here
}
register(key)[source]

Registers a custom formatting function with ub.urepr

Parameters:

key (Type | Tuple[Type] | str) – indicator of the type

Returns:

decorator function

Return type:

Callable

lookup(data)[source]

Returns an appropriate function to format data if one has been registered.

Parameters:

data (Any) – an instance that may have a registered formatter

Returns:

the formatter for the given type

Return type:

Callable

_register_pandas_extensions()[source]

Example

>>> # xdoctest: +REQUIRES(module:pandas)
>>> # xdoctest: +IGNORE_WHITESPACE
>>> import pandas as pd
>>> import numpy as np
>>> import ubelt as ub
>>> rng = np.random.RandomState(0)
>>> data = pd.DataFrame(rng.rand(3, 3))
>>> print(ub.urepr(data))
>>> print(ub.urepr(data, precision=2))
>>> print(ub.urepr({'akeyfdfj': data}, precision=2))
_register_numpy_extensions()[source]

Example

>>> # xdoctest: +REQUIRES(module:numpy)
>>> import sys
>>> import pytest
>>> import ubelt as ub
>>> if not ub.modname_to_modpath('numpy'):
...     raise pytest.skip()
>>> # xdoctest: +IGNORE_WHITESPACE
>>> import numpy as np
>>> data = np.array([[.2, 42, 5], [21.2, 3, .4]])
>>> print(ub.urepr(data))
np.array([[ 0.2, 42. ,  5. ],
          [21.2,  3. ,  0.4]], dtype=np.float64)
>>> print(ub.urepr(data, with_dtype=False))
np.array([[ 0.2, 42. ,  5. ],
          [21.2,  3. ,  0.4]])
>>> print(ub.urepr(data, strvals=True))
[[ 0.2, 42. ,  5. ],
 [21.2,  3. ,  0.4]]
>>> data = np.empty((0, 10), dtype=np.float64)
>>> print(ub.urepr(data, strvals=False))
np.empty((0, 10), dtype=np.float64)
>>> print(ub.urepr(data, strvals=True))
[]
>>> data = np.ma.empty((0, 10), dtype=np.float64)
>>> print(ub.urepr(data, strvals=False))
np.ma.empty((0, 10), dtype=np.float64)
_register_builtin_extensions()[source]
class ubelt.SetDict[source]

Bases: dict

A dictionary subclass where all set operations are defined.

All of the set operations are defined in a key-wise fashion, that is it is like performing the operation on sets of keys. Value conflicts are handled with left-most priority (default for intersection and difference), right-most priority (default for union and symmetric_difference), or via a custom merge callable similar to [RubyMerge].

The set operations are:

  • union (or the | operator) combines multiple dictionaries into

    one. This is nearly identical to the update operation. Rightmost values take priority.

  • intersection (or the & operator). Takes the items from the

    first dictionary that share keys with the following dictionaries (or lists or sets of keys). Leftmost values take priority.

  • difference (or the - operator). Takes only items from the first

    dictionary that do not share keys with following dictionaries. Leftmost values take priority.

  • symmetric_difference (or the ^ operator). Takes the items

    from all dictionaries where the key appears an odd number of times. Rightmost values take priority.

The full set of set operations was originally determined to be beyond the scope of [Pep584], but there was discussion of these additional operations. Some choices were ambiguous, but we believe this design could be considered “natural”.

Note

By default the right-most values take priority in union / symmetric_difference and left-most values take priority in intersection / difference. In summary this is because we consider intersection / difference to be “subtractive” operations, and union / symmetric_difference to be “additive” operations. We expand on this in the following points:

1. intersection / difference is for removing keys — i.e. is used to find values in the first (main) dictionary that are also in some other dictionary (or set or list of keys), whereas

2. union is for adding keys — i.e. it is basically just an alias for dict.update, so the new (rightmost) keys clobber the old.

3. symmetric_difference is somewhat strange if you aren’t familiar with it. At a pure-set level it’s not really a difference, its a pairty operation (think of it more like xor or addition modulo 2). You only keep items where the key appears an odd number of times. Unlike intersection and difference, the results may not be a subset of either input. The union has the same property. This symmetry motivates having the newest (rightmost) keys cobber the old.

Also, union / symmetric_difference does not make sense if arguments on the rights are lists/sets, whereas difference / intersection does.

Note

The SetDict class only defines key-wise set operations. Value-wise or item-wise operations are in general not hashable and therefore not supported. A heavier extension would be needed for that.

Todo

  • [ ] implement merge callables so the user can specify how to resolve

    value conflicts / combine values.

References

CommandLine

xdoctest -m ubelt.util_dict SetDict

Example

>>> import ubelt as ub
>>> a = ub.SetDict({'A': 'Aa', 'B': 'Ba',            'D': 'Da'})
>>> b = ub.SetDict({'A': 'Ab', 'B': 'Bb', 'C': 'Cb',          })
>>> print(a.union(b))
>>> print(a.intersection(b))
>>> print(a.difference(b))
>>> print(a.symmetric_difference(b))
{'A': 'Ab', 'B': 'Bb', 'D': 'Da', 'C': 'Cb'}
{'A': 'Aa', 'B': 'Ba'}
{'D': 'Da'}
{'D': 'Da', 'C': 'Cb'}
>>> print(a | b)  # union
>>> print(a & b)  # intersection
>>> print(a - b)  # difference
>>> print(a ^ b)  # symmetric_difference
{'A': 'Ab', 'B': 'Bb', 'D': 'Da', 'C': 'Cb'}
{'A': 'Aa', 'B': 'Ba'}
{'D': 'Da'}
{'D': 'Da', 'C': 'Cb'}

Example

>>> import ubelt as ub
>>> a = ub.SetDict({'A': 'Aa', 'B': 'Ba',            'D': 'Da'})
>>> b = ub.SetDict({'A': 'Ab', 'B': 'Bb', 'C': 'Cb',          })
>>> c = ub.SetDict({'A': 'Ac', 'B': 'Bc',                       'E': 'Ec'})
>>> d = ub.SetDict({'A': 'Ad',            'C': 'Cd', 'D': 'Dd'})
>>> # 3-ary operations
>>> print(a.union(b, c))
>>> print(a.intersection(b, c))
>>> print(a.difference(b, c))
>>> print(a.symmetric_difference(b, c))
{'A': 'Ac', 'B': 'Bc', 'D': 'Da', 'C': 'Cb', 'E': 'Ec'}
{'A': 'Aa', 'B': 'Ba'}
{'D': 'Da'}
{'D': 'Da', 'C': 'Cb', 'A': 'Ac', 'B': 'Bc', 'E': 'Ec'}
>>> # 4-ary operations
>>> print(ub.UDict.union(a, b, c, c))
>>> print(ub.UDict.intersection(a, b, c, c))
>>> print(ub.UDict.difference(a, b, c, d))
>>> print(ub.UDict.symmetric_difference(a, b, c, d))
{'A': 'Ac', 'B': 'Bc', 'D': 'Da', 'C': 'Cb', 'E': 'Ec'}
{'A': 'Aa', 'B': 'Ba'}
{}
{'B': 'Bc', 'E': 'Ec'}

Example

>>> import ubelt as ub
>>> primes = ub.sdict({v: f'prime_{v}' for v in [2, 3, 5, 7, 11]})
>>> evens = ub.sdict({v: f'even_{v}' for v in [0, 2, 4, 6, 8, 10]})
>>> odds = ub.sdict({v: f'odd_{v}' for v in [1, 3, 5, 7, 9, 11]})
>>> squares = ub.sdict({v: f'square_{v}' for v in [0, 1, 4, 9]})
>>> div3 = ub.sdict({v: f'div3_{v}' for v in [0, 3, 6, 9]})
>>> # All of the set methods are defined
>>> results1 = {}
>>> results1['ints'] = ints = odds.union(evens)
>>> results1['composites'] = ints.difference(primes)
>>> results1['even_primes'] = evens.intersection(primes)
>>> results1['odd_nonprimes_and_two'] = odds.symmetric_difference(primes)
>>> print('results1 = {}'.format(ub.repr2(results1, nl=2, sort=True)))
results1 = {
    'composites': {
        0: 'even_0',
        1: 'odd_1',
        4: 'even_4',
        6: 'even_6',
        8: 'even_8',
        9: 'odd_9',
        10: 'even_10',
    },
    'even_primes': {
        2: 'even_2',
    },
    'ints': {
        0: 'even_0',
        1: 'odd_1',
        2: 'even_2',
        3: 'odd_3',
        4: 'even_4',
        5: 'odd_5',
        6: 'even_6',
        7: 'odd_7',
        8: 'even_8',
        9: 'odd_9',
        10: 'even_10',
        11: 'odd_11',
    },
    'odd_nonprimes_and_two': {
        1: 'odd_1',
        2: 'prime_2',
        9: 'odd_9',
    },
}
>>> # As well as their corresponding binary operators
>>> assert results1['ints'] == odds | evens
>>> assert results1['composites'] == ints - primes
>>> assert results1['even_primes'] == evens & primes
>>> assert results1['odd_nonprimes_and_two'] == odds ^ primes
>>> # These can also be used as classmethods
>>> assert results1['ints'] == ub.sdict.union(odds, evens)
>>> assert results1['composites'] == ub.sdict.difference(ints, primes)
>>> assert results1['even_primes'] == ub.sdict.intersection(evens, primes)
>>> assert results1['odd_nonprimes_and_two'] == ub.sdict.symmetric_difference(odds, primes)
>>> # The narry variants are also implemented
>>> results2 = {}
>>> results2['nary_union'] = ub.sdict.union(primes, div3, odds)
>>> results2['nary_difference'] = ub.sdict.difference(primes, div3, odds)
>>> results2['nary_intersection'] = ub.sdict.intersection(primes, div3, odds)
>>> # Note that the definition of symmetric difference might not be what you think in the nary case.
>>> results2['nary_symmetric_difference'] = ub.sdict.symmetric_difference(primes, div3, odds)
>>> print('results2 = {}'.format(ub.repr2(results2, nl=2, sort=True)))
results2 = {
    'nary_difference': {
        2: 'prime_2',
    },
    'nary_intersection': {
        3: 'prime_3',
    },
    'nary_symmetric_difference': {
        0: 'div3_0',
        1: 'odd_1',
        2: 'prime_2',
        3: 'odd_3',
        6: 'div3_6',
    },
    'nary_union': {
        0: 'div3_0',
        1: 'odd_1',
        2: 'prime_2',
        3: 'odd_3',
        5: 'odd_5',
        6: 'div3_6',
        7: 'odd_7',
        9: 'odd_9',
        11: 'odd_11',
    },
}

Example

>>> # A neat thing about our implementation is that often the right
>>> # hand side is not required to be a dictionary, just something
>>> # that can be cast to a set.
>>> import ubelt as ub
>>> primes = ub.sdict({2: 'a', 3: 'b', 5: 'c', 7: 'd', 11: 'e'})
>>> assert primes - {2, 3} == {5: 'c', 7: 'd', 11: 'e'}
>>> assert primes & {2, 3} == {2: 'a', 3: 'b'}
>>> # Union does need to have a second dictionary
>>> import pytest
>>> with pytest.raises(AttributeError):
>>>     primes | {2, 3}
copy()[source]

Example

>>> import ubelt as ub
>>> a = ub.sdict({1: 1, 2: 2, 3: 3})
>>> b = ub.udict({1: 1, 2: 2, 3: 3})
>>> c = a.copy()
>>> d = b.copy()
>>> assert c is not a
>>> assert d is not b
>>> assert d == b
>>> assert c == a
>>> list(map(type, [a, b, c, d]))
>>> assert isinstance(c, ub.sdict)
>>> assert isinstance(d, ub.udict)
union(*others, cls=None, merge=None)[source]

Return the key-wise union of two or more dictionaries.

Values chosen with right-most priority. I.e. for items with intersecting keys, dictionaries towards the end of the sequence are given precedence.

Parameters:
  • self (SetDict | dict) – if called as a static method this must be provided.

  • *others – other dictionary like objects that have an items method. (i.e. it must return an iterable of 2-tuples where the first item is hashable.)

  • cls (type | None) – the desired return dictionary type.

  • merge (None | Callable) – if specified this function must accept an iterable of values and return a new value to use (which typically is derived from input values). NotImplemented, help wanted.

Returns:

items from all input dictionaries. Conflicts are resolved

with right-most priority unless merge is specified. Specific return type is specified by cls or defaults to the leftmost input.

Return type:

dict

Example

>>> import ubelt as ub
>>> a = ub.SetDict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> b = ub.SetDict({k: 'B_' + chr(97 + k) for k in [2, 4, 0, 7]})
>>> c = ub.SetDict({k: 'C_' + chr(97 + k) for k in [2, 8, 3]})
>>> d = ub.SetDict({k: 'D_' + chr(97 + k) for k in [9, 10, 11]})
>>> e = ub.SetDict({k: 'E_' + chr(97 + k) for k in []})
>>> assert a | b == {2: 'B_c', 3: 'A_d', 5: 'A_f', 7: 'B_h', 4: 'B_e', 0: 'B_a'}
>>> a.union(b)
>>> a | b | c
>>> res = ub.SetDict.union(a, b, c, d, e)
>>> print(ub.repr2(res, sort=1, nl=0, si=1))
{0: B_a, 2: C_c, 3: C_d, 4: B_e, 5: A_f, 7: B_h, 8: C_i, 9: D_j, 10: D_k, 11: D_l}
intersection(*others, cls=None, merge=None)[source]

Return the key-wise intersection of two or more dictionaries.

Values returned with left-most priority. I.e. all items returned will be from the first dictionary for keys that exist in all other dictionaries / sets provided.

Parameters:
  • self (SetDict | dict) – if called as a static method this must be provided.

  • *others – other dictionary or set like objects that can be coerced into a set of keys.

  • cls (type | None) – the desired return dictionary type.

  • merge (None | Callable) – if specified this function must accept an iterable of values and return a new value to use (which typically is derived from input values). NotImplemented, help wanted.

Returns:

items with keys shared by all the inputs. Values take

left-most priority unless merge is specified. Specific return type is specified by cls or defaults to the leftmost input.

Return type:

dict

Example

>>> import ubelt as ub
>>> a = ub.SetDict({'a': 1, 'b': 2, 'd': 4})
>>> b = ub.SetDict({'a': 10, 'b': 20, 'c': 30})
>>> a.intersection(b)
{'a': 1, 'b': 2}
>>> a & b
{'a': 1, 'b': 2}

Example

>>> import ubelt as ub
>>> a = ub.SetDict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> b = ub.SetDict({k: 'B_' + chr(97 + k) for k in [2, 4, 0, 7]})
>>> c = ub.SetDict({k: 'C_' + chr(97 + k) for k in [2, 8, 3]})
>>> d = ub.SetDict({k: 'D_' + chr(97 + k) for k in [9, 10, 11]})
>>> e = ub.SetDict({k: 'E_' + chr(97 + k) for k in []})
>>> assert a & b == {2: 'A_c', 7: 'A_h'}
>>> a.intersection(b)
>>> a & b & c
>>> res = ub.SetDict.intersection(a, b, c, d, e)
>>> print(ub.repr2(res, sort=1, nl=0, si=1))
{}
difference(*others, cls=None, merge=None)[source]

Return the key-wise difference between this dictionary and one or more other dictionary / keys.

Values returned with left-most priority. I.e. the returned items will be from the first dictionary, and will only contain keys that do not appear in any of the other dictionaries / sets.

Parameters:
  • self (SetDict | dict) – if called as a static method this must be provided.

  • *others – other dictionary or set like objects that can be coerced into a set of keys.

  • cls (type | None) – the desired return dictionary type.

  • merge (None | Callable) – if specified this function must accept an iterable of values and return a new value to use (which typically is derived from input values). NotImplemented, help wanted.

Returns:

items from the first dictionary with keys not in any of the

following inputs. Values take left-most priority unless merge is specified. Specific return type is specified by cls or defaults to the leftmost input.

Return type:

dict

Example

>>> import ubelt as ub
>>> a = ub.SetDict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> b = ub.SetDict({k: 'B_' + chr(97 + k) for k in [2, 4, 0, 7]})
>>> c = ub.SetDict({k: 'C_' + chr(97 + k) for k in [2, 8, 3]})
>>> d = ub.SetDict({k: 'D_' + chr(97 + k) for k in [9, 10, 11]})
>>> e = ub.SetDict({k: 'E_' + chr(97 + k) for k in []})
>>> assert a - b == {3: 'A_d', 5: 'A_f'}
>>> a.difference(b)
>>> a - b - c
>>> res = ub.SetDict.difference(a, b, c, d, e)
>>> print(ub.repr2(res, sort=1, nl=0, si=1))
{5: A_f}
symmetric_difference(*others, cls=None, merge=None)[source]

Return the key-wise symmetric difference between this dictionary and one or more other dictionaries.

Values chosen with right-most priority. Returns items that are (key-wise) in an odd number of the given dictionaries. This is consistent with the standard n-ary definition of symmetric difference [WikiSymDiff] and corresponds with the xor operation.

Parameters:
  • self (SetDict | dict) – if called as a static method this must be provided.

  • *others – other dictionary or set like objects that can be coerced into a set of keys.

  • cls (type | None) – the desired return dictionary type.

  • merge (None | Callable) – if specified this function must accept an iterable of values and return a new value to use (which typically is derived from input values). NotImplemented, help wanted.

Returns:

items from input dictionaries where the key appears an odd

number of times. Values take right-most priority unless merge is specified. Specific return type is specified by cls or defaults to the leftmost input.

Return type:

dict

References

Example

>>> import ubelt as ub
>>> a = ub.SetDict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> b = ub.SetDict({k: 'B_' + chr(97 + k) for k in [2, 4, 0, 7]})
>>> c = ub.SetDict({k: 'C_' + chr(97 + k) for k in [2, 8, 3]})
>>> d = ub.SetDict({k: 'D_' + chr(97 + k) for k in [9, 10, 11]})
>>> e = ub.SetDict({k: 'E_' + chr(97 + k) for k in []})
>>> a ^ b
{3: 'A_d', 5: 'A_f', 4: 'B_e', 0: 'B_a'}
>>> a.symmetric_difference(b)
>>> a - b - c
>>> res = ub.SetDict.symmetric_difference(a, b, c, d, e)
>>> print(ub.repr2(res, sort=1, nl=0, si=1))
{0: B_a, 2: C_c, 4: B_e, 5: A_f, 8: C_i, 9: D_j, 10: D_k, 11: D_l}
class ubelt.TeeStringIO(redirect=None)[source]

Bases: StringIO

An IO object that writes to itself and another IO stream.

Variables:

redirect (io.IOBase | None) – The other stream to write to.

Example

>>> import ubelt as ub
>>> import io
>>> redirect = io.StringIO()
>>> self = ub.TeeStringIO(redirect)
>>> self.write('spam')
>>> assert self.getvalue() == 'spam'
>>> assert redirect.getvalue() == 'spam'
Parameters:

redirect (io.IOBase) – The other stream to write to.

isatty()[source]

Returns true of the redirect is a terminal.

Note

Needed for IPython.embed to work properly when this class is used to override stdout / stderr.

SeeAlso:

io.IOBase.isatty()

Returns:

bool

fileno()[source]

Returns underlying file descriptor of the redirected IOBase object if one exists.

Returns:

the integer corresponding to the file descriptor

Return type:

int

SeeAlso:

io.IOBase.fileno()

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt/tests/util_stream').ensuredir()
>>> fpath = dpath / 'fileno-test.txt'
>>> with open(fpath, 'w') as file:
>>>     self = ub.TeeStringIO(file)
>>>     descriptor = self.fileno()
>>>     print(f'descriptor={descriptor}')
>>>     assert isinstance(descriptor, int)

Example

>>> # Test errors
>>> # Not sure the best way to test, this func is important for
>>> # capturing stdout when ipython embedding
>>> import io
>>> import pytest
>>> import ubelt as ub
>>> with pytest.raises(io.UnsupportedOperation):
>>>     ub.TeeStringIO(redirect=io.StringIO()).fileno()
>>> with pytest.raises(io.UnsupportedOperation):
>>>     ub.TeeStringIO(None).fileno()
property encoding

Gets the encoding of the redirect IO object

FIXME:

My complains that this violates the Liskov substitution principle because the return type can be str or None, whereas the parent class always returns a None. In the future we may raise an exception instead of returning None.

SeeAlso:

io.TextIOBase.encoding

Example

>>> import ubelt as ub
>>> redirect = io.StringIO()
>>> assert ub.TeeStringIO(redirect).encoding is None
>>> assert ub.TeeStringIO(None).encoding is None
>>> assert ub.TeeStringIO(sys.stdout).encoding is sys.stdout.encoding
>>> redirect = io.TextIOWrapper(io.StringIO())
>>> assert ub.TeeStringIO(redirect).encoding is redirect.encoding
write(msg)[source]

Write to this and the redirected stream

Parameters:

msg (str) – the data to write

SeeAlso:

io.TextIOBase.write()

Example

>>> import ubelt as ub
>>> dpath = ub.Path.appdir('ubelt/tests/util_stream').ensuredir()
>>> fpath = dpath / 'write-test.txt'
>>> with open(fpath, 'w') as file:
>>>     self = ub.TeeStringIO(file)
>>>     n = self.write('hello world')
>>>     assert n == 11
>>> assert self.getvalue() == 'hello world'
>>> assert fpath.read_text() == 'hello world'
flush()[source]

Flush to this and the redirected stream

SeeAlso:

io.IOBase.flush()

class ubelt.TempDir[source]

Bases: object

Context for creating and cleaning up temporary directories.

Warning

DEPRECATED. Use tempfile instead.

Note

This exists because tempfile.TemporaryDirectory was introduced in Python 3.2. Thus once ubelt no longer supports python 2.7, this class will be deprecated.

Variables:

dpath (str | None) – the temporary path

Note

# WE MAY WANT TO KEEP THIS FOR WINDOWS.

Example

>>> from ubelt.util_path import *  # NOQA
>>> with TempDir() as self:
>>>     dpath = self.dpath
>>>     assert exists(dpath)
>>> assert not exists(dpath)

Example

>>> from ubelt.util_path import *  # NOQA
>>> self = TempDir()
>>> dpath = self.ensure()
>>> assert exists(dpath)
>>> self.cleanup()
>>> assert not exists(dpath)
ensure()[source]
Returns:

the path

Return type:

str

cleanup()[source]
start()[source]
Returns:

self

Return type:

TempDir

class ubelt.Timer(label='', verbose=None, newline=True, ns=False)[source]

Bases: object

Measures time elapsed between a start and end point. Can be used as a with-statement context manager, or using the tic/toc api.

Variables:
  • elapsed (float) – number of seconds measured by the context manager

  • tstart (float) – time of last tic reported by self._time()

  • write (Callable) – function used to write

  • flush (Callable) – function used to flush

Example

>>> # Create and start the timer using the context manager
>>> import math
>>> import ubelt as ub
>>> timer = ub.Timer('Timer test!', verbose=1)
>>> with timer:
>>>     math.factorial(10)
>>> assert timer.elapsed > 0
tic('Timer test!')
...toc('Timer test!')=...

Example

>>> # Create and start the timer using the tic/toc interface
>>> import ubelt as ub
>>> timer = ub.Timer().tic()
>>> elapsed1 = timer.toc()
>>> elapsed2 = timer.toc()
>>> elapsed3 = timer.toc()
>>> assert elapsed1 <= elapsed2
>>> assert elapsed2 <= elapsed3

Example

>>> # In Python 3.7+ nanosecond resolution can be enabled
>>> import ubelt as ub
>>> import sys
>>> if sys.version_info[0:2] <= (3, 6):
>>>     import pytest
>>>     pytest.skip()
>>> # xdoctest +REQUIRES(Python>=3.7)  # fixme directive doesnt exist yet
>>> timer = ub.Timer(label='perf_counter_ns', ns=True).tic()
>>> elapsed1 = timer.toc()
>>> elapsed2 = timer.toc()
>>> assert elapsed1 <= elapsed2
>>> assert isinstance(elapsed1, int)
Parameters:
  • label (str) – identifier for printing. Default to ‘’.

  • verbose (int | None) – verbosity flag, defaults to True if label is given, otherwise 0.

  • newline (bool) – if False and verbose, print tic and toc on the same line. Defaults to True.

  • ns (bool) – if True, a nano-second resolution timer to avoid precision loss caused by the float type. Defaults to False.

_default_time()

perf_counter() -> float

Performance counter for benchmarking.

tic()[source]

starts the timer

Returns:

self

Return type:

Timer

toc()[source]

stops the timer

Returns:

number of second or nanoseconds

Return type:

float | int

class ubelt.UDict[source]

Bases: SetDict

A subclass of dict with ubelt enhancements

This builds on top of SetDict which itself is a simple extension that contains only that extra functionality. The extra invert, map, sorted, and peek functions are less fundamental and there are at least reasonable workarounds when they are not available.

The UDict class is a simple subclass of dict that provides the following upgrades:

  • set operations - inherited from SetDict
    • intersection - find items in common

    • union - merge dicts

    • difference - find items in one but not the other

    • symmetric_difference - find items that appear an odd number of times

  • subdict - take a subset with optional default values. (similar to intersection, but the later ignores non-common values)

  • inversion -
    • invert - swaps a dictionary keys and values (with options for dealing with duplicates).

  • mapping -
    • map_keys - applies a function over each key and keeps the values the same

    • map_values - applies a function over each key and keeps the values the same

  • sorting -
    • sorted_keys - returns a dictionary ordered by the keys

    • sorted_values - returns a dictionary ordered by the values

IMO key-wise set operations on dictionaries are fundamentaly and sorely missing from the stdlib, mapping is super convinient, sorting and inversion are less common, but still useful to have.

Todo

  • [ ] UbeltDict, UltraDict, not sure what the name is. We may just rename this to Dict,

Example

>>> import ubelt as ub
>>> a = ub.udict({1: 20, 2: 20, 3: 30, 4: 40})
>>> b = ub.udict({0: 0, 2: 20, 4: 42})
>>> c = ub.udict({3: -1, 5: -1})
>>> # Demo key-wise set operations
>>> assert a & b == {2: 20, 4: 40}
>>> assert a - b == {1: 20, 3: 30}
>>> assert a ^ b == {1: 20, 3: 30, 0: 0}
>>> assert a | b == {1: 20, 2: 20, 3: 30, 4: 42, 0: 0}
>>> # Demo new n-ary set methods
>>> a.union(b, c) == {1: 20, 2: 20, 3: -1, 4: 42, 0: 0, 5: -1}
>>> a.intersection(b, c) == {}
>>> a.difference(b, c) == {1: 20}
>>> a.symmetric_difference(b, c) == {1: 20, 0: 0, 5: -1}
>>> # Demo new quality of life methods
>>> assert a.subdict({2, 4, 6, 8}, default=None) == {8: None, 2: 20, 4: 40, 6: None}
>>> assert a.invert() == {20: 2, 30: 3, 40: 4}
>>> assert a.invert(unique_vals=0) == {20: {1, 2}, 30: {3}, 40: {4}}
>>> assert a.peek_key() == ub.peek(a.keys())
>>> assert a.peek_value() == ub.peek(a.values())
>>> assert a.map_keys(lambda x: x * 10) == {10: 20, 20: 20, 30: 30, 40: 40}
>>> assert a.map_values(lambda x: x * 10) == {1: 200, 2: 200, 3: 300, 4: 400}
subdict(keys, default=NoParam)[source]

Get a subset of a dictionary

Parameters:
  • self (Dict[KT, VT]) – dictionary or the implicit instance

  • keys (Iterable[KT]) – keys to take from self

  • default (Optional[object] | NoParamType) – if specified uses default if keys are missing.

Raises:

KeyError – if a key does not exist and default is not specified

SeeAlso:

ubelt.util_dict.dict_subset() ubelt.UDict.take()

Example

>>> import ubelt as ub
>>> a = ub.udict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> s = a.subdict({2, 5})
>>> print('s = {}'.format(ub.repr2(s, nl=0, sort=1)))
s = {2: 'A_c', 5: 'A_f'}
>>> import pytest
>>> with pytest.raises(KeyError):
>>>     s = a.subdict({2, 5, 100})
>>> s = a.subdict({2, 5, 100}, default='DEF')
>>> print('s = {}'.format(ub.repr2(s, nl=0, sort=1)))
s = {2: 'A_c', 5: 'A_f', 100: 'DEF'}
take(keys, default=NoParam)[source]

Get values of an iterable of keys.

Parameters:
  • self (Dict[KT, VT]) – dictionary or the implicit instance

  • keys (Iterable[KT]) – keys to take from self

  • default (Optional[object] | NoParamType) – if specified uses default if keys are missing.

Yields:

VT – a selected value within the dictionary

Raises:

KeyError – if a key does not exist and default is not specified

SeeAlso:

ubelt.util_list.take() ubelt.UDict.subdict()

Example

>>> import ubelt as ub
>>> a = ub.udict({k: 'A_' + chr(97 + k) for k in [2, 3, 5, 7]})
>>> s = list(a.take({2, 5}))
>>> print('s = {}'.format(ub.repr2(s, nl=0, sort=1)))
s = ['A_c', 'A_f']
>>> import pytest
>>> with pytest.raises(KeyError):
>>>     s = a.subdict({2, 5, 100})
>>> s = list(a.take({2, 5, 100}, default='DEF'))
>>> print('s = {}'.format(ub.repr2(s, nl=0, sort=1)))
s = ['A_c', 'A_f', 'DEF']
invert(unique_vals=True)[source]

Swaps the keys and values in a dictionary.

Parameters:
  • self (Dict[KT, VT]) – dictionary or the implicit instance to invert

  • unique_vals (bool, default=True) – if False, the values of the new dictionary are sets of the original keys.

  • cls (type | None) – specifies the dict subclassof the result. if unspecified will be dict or OrderedDict. This behavior may change.

Returns:

the inverted dictionary

Return type:

Dict[VT, KT] | Dict[VT, Set[KT]]

Note

The must values be hashable.

If the original dictionary contains duplicate values, then only one of the corresponding keys will be returned and the others will be discarded. This can be prevented by setting unique_vals=False, causing the inverted keys to be returned in a set.

Example

>>> import ubelt as ub
>>> inverted = ub.udict({'a': 1, 'b': 2}).invert()
>>> assert inverted == {1: 'a', 2: 'b'}
map_keys(func)[source]

Apply a function to every value in a dictionary.

Creates a new dictionary with the same keys and modified values.

Parameters:
  • self (Dict[KT, VT]) – a dictionary or the implicit instance.

  • func (Callable[[VT], T] | Mapping[VT, T]) – a function or indexable object

Returns:

transformed dictionary

Return type:

Dict[KT, T]

Example

>>> import ubelt as ub
>>> new = ub.udict({'a': [1, 2, 3], 'b': []}).map_keys(ord)
>>> assert new == {97: [1, 2, 3], 98: []}
map_values(func)[source]

Apply a function to every value in a dictionary.

Creates a new dictionary with the same keys and modified values.

Parameters:
  • self (Dict[KT, VT]) – a dictionary or the implicit instance.

  • func (Callable[[VT], T] | Mapping[VT, T]) – a function or indexable object

Returns:

transformed dictionary

Return type:

Dict[KT, T]

Example

>>> import ubelt as ub
>>> newdict = ub.udict({'a': [1, 2, 3], 'b': []}).map_values(len)
>>> assert newdict ==  {'a': 3, 'b': 0}
sorted_keys(key=None, reverse=False)[source]

Return an ordered dictionary sorted by its keys

Parameters:
  • self (Dict[KT, VT]) – dictionary to sort or the implicit instance. The keys must be of comparable types.

  • key (Callable[[KT], Any] | None) – If given as a callable, customizes the sorting by ordering using transformed keys.

  • reverse (bool, default=False) – if True returns in descending order

Returns:

new dictionary where the keys are ordered

Return type:

OrderedDict[KT, VT]

Example

>>> import ubelt as ub
>>> new = ub.udict({'spam': 2.62, 'eggs': 1.20, 'jam': 2.92}).sorted_keys()
>>> assert new == ub.odict([('eggs', 1.2), ('jam', 2.92), ('spam', 2.62)])
sorted_values(key=None, reverse=False)[source]

Return an ordered dictionary sorted by its values

Parameters:
  • self (Dict[KT, VT]) – dictionary to sort or the implicit instance. The values must be of comparable types.

  • key (Callable[[VT], Any] | None) – If given as a callable, customizes the sorting by ordering using transformed values.

  • reverse (bool, default=False) – if True returns in descending order

Returns:

new dictionary where the values are ordered

Return type:

OrderedDict[KT, VT]

Example

>>> import ubelt as ub
>>> new = ub.udict({'spam': 2.62, 'eggs': 1.20, 'jam': 2.92}).sorted_values()
>>> assert new == ub.odict([('eggs', 1.2), ('spam', 2.62), ('jam', 2.92)])
peek_key(default=NoParam)[source]

Get the first key in the dictionary

Parameters:
  • self (Dict) – a dictionary or the implicit instance

  • default (KT | NoParamType) – default item to return if the iterable is empty, otherwise a StopIteration error is raised

Returns:

the first value or the default

Return type:

KT

Example

>>> import ubelt as ub
>>> assert ub.udict({1: 2}).peek_key() == 1
peek_value(default=NoParam)[source]

Get the first value in the dictionary

Parameters:
  • self (Dict[KT, VT]) – a dictionary or the implicit instance

  • default (VT | NoParamType) – default item to return if the iterable is empty, otherwise a StopIteration error is raised

Returns:

the first value or the default

Return type:

VT

Example

>>> import ubelt as ub
>>> assert ub.udict({1: 2}).peek_value() == 2
ubelt.allsame(iterable, eq=<built-in function eq>)[source]

Determine if all items in a sequence are the same

Parameters:
  • iterable (Iterable[T]) – items to determine if they are all the same

  • eq (Callable[[T, T], bool], default=operator.eq) – function used to test for equality

Returns:

True if all items are equal, otherwise False

Return type:

bool

Notes

Similar to more_itertools.all_equal()

Example

>>> import ubelt as ub
>>> ub.allsame([1, 1, 1, 1])
True
>>> ub.allsame([])
True
>>> ub.allsame([0, 1])
False
>>> iterable = iter([0, 1, 1, 1])
>>> next(iterable)
>>> ub.allsame(iterable)
True
>>> ub.allsame(range(10))
False
>>> ub.allsame(range(10), lambda a, b: True)
True
ubelt.argflag(key, argv=None)[source]

Determines if a key is specified on the command line.

This is a functional alternative to key in sys.argv, but it also allows for multiple aliases of the same flag to be specified.

Parameters:
  • key (str | Tuple[str, …]) – string or tuple of strings. Each key should be prefixed with two hyphens (i.e. --).

  • argv (List[str] | None, default=None) – overrides sys.argv if specified

Returns:

flag - True if the key (or any of the keys) was specified

Return type:

bool

CommandLine

xdoctest -m ubelt.util_arg argflag:0
xdoctest -m ubelt.util_arg argflag:0 --devflag
xdoctest -m ubelt.util_arg argflag:0 -df
xdoctest -m ubelt.util_arg argflag:0 --devflag2
xdoctest -m ubelt.util_arg argflag:0 -df2

Example

>>> # Everyday usage of this function might look like this
>>> import ubelt as ub
>>> # Check if either of these strings are in sys.argv
>>> flag = ub.argflag(('-df', '--devflag'))
>>> if flag:
>>>     print(ub.color_text(
>>>         'A hidden developer flag was given!', 'blue'))
>>> print('Pass the hidden CLI flag to see a secret message')

Example

>>> import ubelt as ub
>>> argv = ['--spam', '--eggs', 'foo']
>>> assert ub.argflag('--eggs', argv=argv) is True
>>> assert ub.argflag('--ans', argv=argv) is False
>>> assert ub.argflag('foo', argv=argv) is True
>>> assert ub.argflag(('bar', '--spam'), argv=argv) is True
ubelt.argmax(indexable, key=None)[source]

Returns index / key of the item with the largest value.

This is similar to numpy.argmax(), but it is written in pure python and works on both lists and dictionaries.

Parameters:
  • indexable (Iterable[VT] | Mapping[KT, VT]) – indexable to sort by

  • key (Callable[[VT], Any] | None, default=None) – customizes the ordering of the indexable

Returns:

the index of the item with the maximum value.

Return type:

int | KT

Example

>>> import ubelt as ub
>>> assert ub.argmax({'a': 3, 'b': 2, 'c': 100}) == 'c'
>>> assert ub.argmax(['a', 'c', 'b', 'z', 'f']) == 3
>>> assert ub.argmax([[0, 1], [2, 3, 4], [5]], key=len) == 1
>>> assert ub.argmax({'a': 3, 'b': 2, 3: 100, 4: 4}) == 3
>>> assert ub.argmax(iter(['a', 'c', 'b', 'z', 'f'])) == 3
ubelt.argmin(indexable, key=None)[source]

Returns index / key of the item with the smallest value.

This is similar to numpy.argmin(), but it is written in pure python and works on both lists and dictionaries.

Parameters:
  • indexable (Iterable[VT] | Mapping[KT, VT]) – indexable to sort by

  • key (Callable[[VT], VT] | None, default=None) – customizes the ordering of the indexable

Returns:

the index of the item with the minimum value.

Return type:

int | KT

Example

>>> import ubelt as ub
>>> assert ub.argmin({'a': 3, 'b': 2, 'c': 100}) == 'b'
>>> assert ub.argmin(['a', 'c', 'b', 'z', 'f']) == 0
>>> assert ub.argmin([[0, 1], [2, 3, 4], [5]], key=len) == 2
>>> assert ub.argmin({'a': 3, 'b': 2, 3: 100, 4: 4}) == 'b'
>>> assert ub.argmin(iter(['a', 'c', 'A', 'z', 'f'])) == 2
ubelt.argsort(indexable, key=None, reverse=False)[source]

Returns the indices that would sort a indexable object.

This is similar to numpy.argsort(), but it is written in pure python and works on both lists and dictionaries.

Parameters:
  • indexable (Iterable[VT] | Mapping[KT, VT]) – indexable to sort by

  • key (Callable[[VT], VT] | None, default=None) – customizes the ordering of the indexable

  • reverse (bool, default=False) – if True returns in descending order

Returns:

indices - list of indices that sorts the indexable

Return type:

List[int] | List[KT]

Example

>>> import ubelt as ub
>>> # argsort works on dicts by returning keys
>>> dict_ = {'a': 3, 'b': 2, 'c': 100}
>>> indices = ub.argsort(dict_)
>>> assert list(ub.take(dict_, indices)) == sorted(dict_.values())
>>> # argsort works on lists by returning indices
>>> indexable = [100, 2, 432, 10]
>>> indices = ub.argsort(indexable)
>>> assert list(ub.take(indexable, indices)) == sorted(indexable)
>>> # Can use iterators, but be careful. It exhausts them.
>>> indexable = reversed(range(100))
>>> indices = ub.argsort(indexable)
>>> assert indices[0] == 99
>>> # Can use key just like sorted
>>> indexable = [[0, 1, 2], [3, 4], [5]]
>>> indices = ub.argsort(indexable, key=len)
>>> assert indices == [2, 1, 0]
>>> # Can use reverse just like sorted
>>> indexable = [0, 2, 1]
>>> indices = ub.argsort(indexable, reverse=True)
>>> assert indices == [1, 2, 0]
ubelt.argunique(items, key=None)[source]

Returns indices corresponding to the first instance of each unique item.

Parameters:
  • items (Sequence[VT]) – indexable collection of items

  • key (Callable[[VT], Any] | None, default=None) – custom normalization function. If specified returns items where key(item) is unique.

Returns:

indices of the unique items

Return type:

Iterator[int]

Example

>>> import ubelt as ub
>>> items = [0, 2, 5, 1, 1, 0, 2, 4]
>>> indices = list(ub.argunique(items))
>>> assert indices == [0, 1, 2, 3, 7]
>>> indices = list(ub.argunique(items, key=lambda x: x % 2 == 0))
>>> assert indices == [0, 2]
ubelt.argval(key, default=NoParam, argv=None)[source]

Get the value of a keyword argument specified on the command line.

Values can be specified as <key> <value> or <key>=<value>

The use-case for this function is to add hidden command line feature where a developer can pass in a special value. This can be used to prototype a command line interface, provide an easter egg, or add some other command line parsing that wont be exposed in CLI help docs.

Parameters:
  • key (str | Tuple[str, …]) – string or tuple of strings. Each key should be prefixed with two hyphens (i.e. --)

  • default (T | NoParamType, default=NoParam) – a value to return if not specified.

  • argv (List[str] | None, default=None) – uses sys.argv if unspecified

Returns:

value - the value specified after the key. It they key is specified multiple times, then the first value is returned.

Return type:

str | T

Todo

  • [x] Can we handle the case where the value is a list of long paths? - No

  • [ ] Should we default the first or last specified instance of the flag.

CommandLine

xdoctest -m ubelt.util_arg argval:0
xdoctest -m ubelt.util_arg argval:0 --devval
xdoctest -m ubelt.util_arg argval:0 --devval=1
xdoctest -m ubelt.util_arg argval:0 --devval=2
xdoctest -m ubelt.util_arg argval:0 --devval 3
xdoctest -m ubelt.util_arg argval:0 --devval "4 5 6"

Example

>>> # Everyday usage of this function might look like this where
>>> import ubelt as ub
>>> # grab a key/value pair if is given on the command line
>>> value = ub.argval('--devval', default='1')
>>> print('Checking if the hidden CLI key/value pair is given')
>>> if value != '1':
>>>     print(ub.color_text(
>>>         'A hidden developer secret: {!r}'.format(value), 'yellow'))
>>> print('Pass the hidden CLI key/value pair to see a secret message')

Example

>>> import ubelt as ub
>>> argv = ['--ans', '42', '--quest=the grail', '--ans=6', '--bad']
>>> assert ub.argval('--spam', argv=argv) == ub.NoParam
>>> assert ub.argval('--quest', argv=argv) == 'the grail'
>>> assert ub.argval('--ans', argv=argv) == '42'
>>> assert ub.argval('--bad', argv=argv) == ub.NoParam
>>> assert ub.argval(('--bad', '--bar'), argv=argv) == ub.NoParam

Example

>>> # Test fix for GH Issue #41
>>> import ubelt as ub
>>> argv = ['--path=/path/with/k=3']
>>> ub.argval('--path', argv=argv) == '/path/with/k=3'
ubelt.augpath(path, suffix='', prefix='', ext=None, tail='', base=None, dpath=None, relative=None, multidot=False)[source]

Create a new path with a different extension, basename, directory, prefix, and/or suffix.

A prefix is inserted before the basename. A suffix is inserted between the basename and the extension. The basename and extension can be replaced with a new one. Essentially a path is broken down into components (dpath, base, ext), and then recombined as (dpath, prefix, base, suffix, ext) after replacing any specified component.

Parameters:
  • path (str | PathLike) – a path to augment

  • suffix (str) – placed between the basename and extension Note: this is referred to as stemsuffix in ub.Path.augment().

  • prefix (str) – placed in front of the basename

  • ext (str | None) – if specified, replaces the extension

  • tail (str | None) – If specified, appends this text to the extension

  • base (str | None) – if specified, replaces the basename without extension. Note: this is referred to as stem in ub.Path.augment().

  • dpath (str | PathLike | None) – if specified, replaces the specified “relative” directory, which by default is the parent directory.

  • relative (str | PathLike | None) – Replaces relative with dpath in path. Has no effect if dpath is not specified. Defaults to the dirname of the input path. experimental not currently implemented.

  • multidot (bool) – Allows extensions to contain multiple dots. Specifically, if False, everything after the last dot in the basename is the extension. If True, everything after the first dot in the basename is the extension.

Returns:

augmented path

Return type:

str

Example

>>> import ubelt as ub
>>> path = 'foo.bar'
>>> suffix = '_suff'
>>> prefix = 'pref_'
>>> ext = '.baz'
>>> newpath = ub.augpath(path, suffix, prefix, ext=ext, base='bar')
>>> print('newpath = %s' % (newpath,))
newpath = pref_bar_suff.baz

Example

>>> from ubelt.util_path import *  # NOQA
>>> augpath('foo.bar')
'foo.bar'
>>> augpath('foo.bar', ext='.BAZ')
'foo.BAZ'
>>> augpath('foo.bar', suffix='_')
'foo_.bar'
>>> augpath('foo.bar', prefix='_')
'_foo.bar'
>>> augpath('foo.bar', base='baz')
'baz.bar'
>>> augpath('foo.tar.gz', ext='.zip', multidot=True)
foo.zip
>>> augpath('foo.tar.gz', ext='.zip', multidot=False)
foo.tar.zip
>>> augpath('foo.tar.gz', suffix='_new', multidot=True)
foo_new.tar.gz
>>> augpath('foo.tar.gz', suffix='_new', tail='.cache', multidot=True)
foo_new.tar.gz.cache
ubelt.boolmask(indices, maxval=None)[source]

Constructs a list of booleans where an item is True if its position is in indices otherwise it is False.

Parameters:
  • indices (List[int]) – list of integer indices

  • maxval (int | None) – length of the returned list. If not specified this is inferred using max(indices)

Returns:

mask - a list of booleans. mask[idx] is True if idx in indices

Return type:

List[bool]

Note

In the future the arg maxval may change its name to shape

Example

>>> import ubelt as ub
>>> indices = [0, 1, 4]
>>> mask = ub.boolmask(indices, maxval=6)
>>> assert mask == [True, True, False, False, True, False]
>>> mask = ub.boolmask(indices)
>>> assert mask == [True, True, False, False, True]
class ubelt.chunks(items, chunksize=None, nchunks=None, total=None, bordermode='none', legacy=False)[source]

Bases: object

Generates successive n-sized chunks from items.

If the last chunk has less than n elements, bordermode is used to determine fill values.

Note

FIXME:

When nchunks is given, that’s how many chunks we should get but the issue is that chunksize is not well defined in that instance For instance how do we turn a list with 4 elements into 3 chunks where does the extra item go?

In ubelt <= 0.10.3 there is a bug when specifying nchunks, where it chooses a chunksize that is too large. Specify legacy=True to get the old buggy behavior if needed.

Notes

This is similar to functionality provided by

more_itertools.chunked(), more_itertools.chunked_even(), more_itertools.sliced(), more_itertools.divide(),

Yields:

List[T] – subsequent non-overlapping chunks of the input items

Variables:

remainder (int) – number of leftover items that don’t divide cleanly

References

Example

>>> import ubelt as ub
>>> items = '1234567'
>>> genresult = ub.chunks(items, chunksize=3)
>>> list(genresult)
[['1', '2', '3'], ['4', '5', '6'], ['7']]

Example

>>> import ubelt as ub
>>> items = [1, 2, 3, 4, 5, 6, 7]
>>> genresult = ub.chunks(items, chunksize=3, bordermode='none')
>>> assert list(genresult) == [[1, 2, 3], [4, 5, 6], [7]]
>>> genresult = ub.chunks(items, chunksize=3, bordermode='cycle')
>>> assert list(genresult) == [[1, 2, 3], [4, 5, 6], [7, 1, 2]]
>>> genresult = ub.chunks(items, chunksize=3, bordermode='replicate')
>>> assert list(genresult) == [[1, 2, 3], [4, 5, 6], [7, 7, 7]]

Example

>>> import ubelt as ub
>>> assert len(list(ub.chunks(range(2), nchunks=2))) == 2
>>> assert len(list(ub.chunks(range(3), nchunks=2))) == 2
>>> # Note: ub.chunks will not do the 2,1,1 split
>>> assert len(list(ub.chunks(range(4), nchunks=3))) == 3
>>> assert len(list(ub.chunks([], 2, bordermode='none'))) == 0
>>> assert len(list(ub.chunks([], 2, bordermode='cycle'))) == 0
>>> assert len(list(ub.chunks([], 2, None, bordermode='replicate'))) == 0

Example

>>> from ubelt.util_list import *  # NOQA
>>> def _check_len(self):
...     assert len(self) == len(list(self))
>>> _check_len(chunks(list(range(3)), nchunks=2))
>>> _check_len(chunks(list(range(2)), nchunks=2))
>>> _check_len(chunks(list(range(2)), nchunks=3))

Example

>>> from ubelt.util_list import *  # NOQA
>>> import pytest
>>> assert pytest.raises(ValueError, chunks, range(9))
>>> assert pytest.raises(ValueError, chunks, range(9), chunksize=2, nchunks=2)
>>> assert pytest.raises(TypeError, len, chunks((_ for _ in range(2)), 2))

Example

>>> from ubelt.util_list import *  # NOQA
>>> import ubelt as ub
>>> basis = {
>>>     'legacy': [False, True],
>>>     'chunker': [{'nchunks': 3}, {'nchunks': 4}, {'nchunks': 5}, {'nchunks': 7}, {'chunksize': 3}],
>>>     'items': [range(2), range(4), range(5), range(7), range(9)],
>>>     'bordermode': ['none', 'cycle', 'replicate'],
>>> }
>>> grid_items = list(ub.named_product(basis))
>>> rows = []
>>> for grid_item in ub.ProgIter(grid_items):
>>>     chunker = grid_item.get('chunker')
>>>     grid_item.update(chunker)
>>>     kw = ub.dict_diff(grid_item, {'chunker'})
>>>     self = chunk_iter = ub.chunks(**kw)
>>>     chunked = list(chunk_iter)
>>>     chunk_lens = list(map(len, chunked))
>>>     row = ub.dict_union(grid_item, {'chunk_lens': chunk_lens, 'chunks': chunked})
>>>     row['chunker'] = str(row['chunker'])
>>>     if not row['legacy'] and 'nchunks' in kw:
>>>         assert kw['nchunks'] == row['nchunks']
>>>     row.update(chunk_iter.__dict__)
>>>     rows.append(row)
>>> # xdoctest: +SKIP
>>> import pandas as pd
>>> df = pd.DataFrame(rows)
>>> for _, subdf in df.groupby('chunker'):
>>>     print(subdf)
Parameters:
  • items (Iterable) – input to iterate over

  • chunksize (int | None) – size of each sublist yielded

  • nchunks (int | None) – number of chunks to create ( cannot be specified if chunksize is specified)

  • bordermode (str) – determines how to handle the last case if the length of the input is not divisible by chunksize valid values are: {‘none’, ‘cycle’, ‘replicate’}

  • total (int | None) – hints about the length of the input

  • legacy (bool) – if True use old behavior, defaults to False. This will be removed in the future.

_new_iterator()[source]
static noborder(items, chunksize)[source]
static cycle(items, chunksize)[source]
static replicate(items, chunksize)[source]
ubelt.cmd(command, shell=False, detach=False, verbose=0, tee=None, cwd=None, env=None, tee_backend='auto', check=False, system=False, timeout=None, capture=True)[source]

Executes a command in a subprocess.

The advantage of this wrapper around subprocess is that (1) you control if the subprocess prints to stdout, (2) the text written to stdout and stderr is returned for parsing, (3) cross platform behavior that lets you specify the command as a string or tuple regardless of whether or not shell=True. (4) ability to detach, return the process object and allow the process to run in the background (eventually we may return a Future object instead).

Parameters:
  • command (str | List[str]) – command string, tuple of executable and args, or shell command.

  • shell (bool) – if True, process is run in shell. Defaults to False.

  • detach (bool) – if True, process is detached and run in background. Defaults to False.

  • verbose (int) – verbosity mode. Can be 0, 1, 2, or 3. Defaults to 0.

  • tee (bool | None) – if True, simultaneously writes to stdout while capturing output from the command. If not specified, defaults to True if verbose > 0. If detach is True, then this argument is ignored.

  • cwd (str | PathLike | None) – Path to run command. Defaults to current working directory if unspecified.

  • env (Dict[str, str] | None) – environment passed to Popen

  • tee_backend (str) – backend for tee output. Valid choices are: “auto”, “select” (POSIX only), and “thread”. Defaults to “auto”.

  • check (bool) – if True, check that the return code was zero before returning, otherwise raise a subprocess.CalledProcessError. Does nothing if detach is True. Defaults to False.

  • system (bool) – if True, most other considerations are dropped, and os.system() is used to execute the command in a platform dependant way. Other arguments such as env, tee, timeout, and shell are all ignored. Defaults to False. (New in version 1.1.0)

  • timeout (float | None) – If the process does not complete in timeout seconds, raise a subprocess.TimeoutExpired. (New in version 1.1.0).

  • capture (bool) – if True, the stdout/stderr are captured and returned in the information dictionary. Ignored if detatch or system is True.

Returns:

info - information about command status. if detach is False info contains captured standard out, standard error, and the return code if detach is True info contains a reference to the process.

Return type:

dict | CmdOutput

Raises:
  • ValueError - on an invalid configuration

  • subprocess.TimeoutExpired - if the timeout limit is exceeded

  • subprocess.CalledProcessError - if check and the return value is non zero

Note

When using the tee output, the stdout and stderr may be shuffled from what they would be on the command line.

Related Work:

Similar to other libraries: [SubprocTee], [ShellJob], [CmdRunner], [PyInvoke].

References