ubelt.util_futures module

Introduces the Executor class that wraps the standard ThreadPoolExecutor, ProcessPoolExecutor, and the new SerialExecutor with a common interface and a backend that changes dynamically. This makes is easy to test if your code benefits from parallism, how much it benefits, and gives you the ability to disable if if you need to.

Example

>>> # xdoctest: +SKIP
>>> # Note: while this works in IPython, this does not work when running
>>> # in xdoctest. https://github.com/Erotemic/xdoctest/issues/101
>>> # xdoctest: +REQUIRES(module:timerit)
>>> # Does my function benefit from parallelism?
>>> def my_function(arg1, arg2):
...     return (arg1 + arg2) * 3
>>> #
>>> def run_process(inputs, mode='serial', max_workers=0):
...     from concurrent.futures import as_completed
...     import ubelt as ub
...     # The executor interface is the same regardless of modes
...     executor = ub.Executor(mode=mode, max_workers=max_workers)
...     # submit returns a Future object
...     jobs = [executor.submit(my_function, *args) for args in inputs]
...     # future objects will contain results when they are done
...     results = [job.result() for job in as_completed(jobs)]
...     return results
>>> # The same code tests our method in serial, thread, or process mode
>>> import timerit
>>> ti = timerit.Timerit(100, bestof=10, verbose=2)
>>> # Setup test data
>>> import random
>>> rng = random.Random(0)
>>> max_workers = 4
>>> inputs = [(rng.random(), rng.random()) for _ in range(100)]
>>> for mode in ['serial', 'process', 'thread']:
>>>     for timer in ti.reset('mode={} max_workers={}'.format(mode, max_workers)):
>>>         with timer:
>>>             run_process(inputs, mode=mode, max_workers=max_workers)
>>> print(ub.repr2(ti))
class ubelt.util_futures.Executor(mode='thread', max_workers=0)[source]

Bases: object

Wrapper around a specific executor.

Abstracts Serial, Thread, and Process Executor via arguments.

Parameters
  • mode (str, default=’thread’) – either thread, serial, or process

  • max_workers (int, default=0) – number of workers. If 0, serial is forced.

Example

>>> import platform
>>> import sys
>>> # The process backend breaks pyp3 when using coverage
>>> if 'pypy' in platform.python_implementation().lower():
...     import pytest
...     pytest.skip('not testing process on pypy')
>>> if sys.platform.startswith('win32'):
...     import pytest
...     pytest.skip('not running this test on win32 for now')
>>> import ubelt as ub
>>> # Fork before threading!
>>> # https://pybay.com/site_media/slides/raymond2017-keynote/combo.html
>>> self1 = ub.Executor(mode='serial', max_workers=0)
>>> self1.__enter__()
>>> self2 = ub.Executor(mode='process', max_workers=2)
>>> self2.__enter__()
>>> self3 = ub.Executor(mode='thread', max_workers=2)
>>> self3.__enter__()
>>> jobs = []
>>> jobs.append(self1.submit(sum, [1, 2, 3]))
>>> jobs.append(self1.submit(sum, [1, 2, 3]))
>>> jobs.append(self2.submit(sum, [10, 20, 30]))
>>> jobs.append(self2.submit(sum, [10, 20, 30]))
>>> jobs.append(self3.submit(sum, [4, 5, 5]))
>>> jobs.append(self3.submit(sum, [4, 5, 5]))
>>> for job in jobs:
>>>     result = job.result()
>>>     print('result = {!r}'.format(result))
>>> self1.__exit__(None, None, None)
>>> self2.__exit__(None, None, None)
>>> self3.__exit__(None, None, None)

Example

>>> import ubelt as ub
>>> self1 = ub.Executor(mode='serial', max_workers=0)
>>> with self1:
>>>     jobs = []
>>>     for i in range(10):
>>>         jobs.append(self1.submit(sum, [i + 1, i]))
>>>     for job in jobs:
>>>         job.add_done_callback(lambda x: print('done callback got x = {}'.format(x)))
>>>         result = job.result()
>>>         print('result = {!r}'.format(result))
submit(func, *args, **kw)[source]

Calls the submit function of the underlying backend.

Returns

a future representing the job

Return type

concurrent.futures.Future

shutdown()[source]

Calls the shutdown function of the underlying backend.

map(fn, *iterables, **kwargs)[source]

Calls the map function of the underlying backend.

CommandLine

xdoctest -m /home/joncrall/code/ubelt/ubelt/util_futures.py Executor.map

Example

>>> import ubelt as ub
>>> import concurrent.futures
>>> import string
>>> with ub.Executor(mode='serial') as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> with ub.Executor(mode='thread', max_workers=2) as executor:
...     result_iter = executor.map(int, string.digits)
...     results = list(result_iter)
>>> # xdoctest: +IGNORE_WANT
>>> print('results = {!r}'.format(results))
results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
class ubelt.util_futures.JobPool(mode='thread', max_workers=0)[source]

Bases: object

Abstracts away boilerplate of submitting and collecting jobs

Example

>>> import ubelt as ub
>>> def worker(data):
>>>     return data + 1
>>> pool = ub.JobPool('thread', max_workers=16)
>>> with pool:
>>>     for data in ub.ProgIter(range(10), desc='submit jobs'):
>>>         job = pool.submit(worker, data)
>>>     final = []
>>>     for job in ub.ProgIter(pool.as_completed(), total=len(pool), desc='collect jobs'):
>>>         info = job.result()
>>>         final.append(info)
>>> print('final = {!r}'.format(final))
>>> pool.shutdown()
submit(func, *args, **kwargs)[source]

Submit a job managed by the pool

Returns

a future representing the job

Return type

concurrent.futures.Future

shutdown()[source]
as_completed()[source]

Generates completed jobs in an arbitrary order

Yields

concurrent.futures.Future

CommandLine

xdoctest -m ubelt.util_futures JobPool.as_completed

Example

>>> import ubelt as ub
>>> pool = ub.JobPool('thread', max_workers=8)
>>> text = ub.paragraph(
...     '''
...     UDP is a cool protocol, check out the wiki:
...
...     UDP-based Data Transfer Protocol (UDT), is a high-performance
...     data transfer protocol designed for transferring large
...     volumetric datasets over high-speed wide area networks. Such
...     settings are typically disadvantageous for the more common TCP
...     protocol.
...     ''')
>>> for word in text.split(' '):
...     pool.submit(print, word)
>>> for _ in pool.as_completed():
...     pass
>>> pool.shutdown()