ubelt.util_futures module¶
Introduces the Executor
class that wraps the standard
ThreadPoolExecutor, ProcessPoolExecutor, and the new SerialExecutor with a
common interface and a configurable backend. This makes is easy to test if your
code benefits from parallism, how much it benefits, and gives you the ability
to disable if if you need to.
The Executor
class lets you choose the right level of concurrency
(which might be no concurrency). An excellent blog post on when to use
threads, processes, or asyncio [ChooseTheRightConcurrency].
Note that executor does not currently support asyncio, but this might be a feature added in the future, but its unclear how interoperable this would be.
References
Example
>>> # xdoctest: +SKIP
>>> # Note: while this works in IPython, this does not work when running
>>> # in xdoctest. https://github.com/Erotemic/xdoctest/issues/101
>>> # xdoctest: +REQUIRES(module:timerit)
>>> # Does my function benefit from parallelism?
>>> def my_function(arg1, arg2):
... return (arg1 + arg2) * 3
>>> #
>>> def run_process(inputs, mode='serial', max_workers=0):
... from concurrent.futures import as_completed
... import ubelt as ub
... # The executor interface is the same regardless of modes
... executor = ub.Executor(mode=mode, max_workers=max_workers)
... # submit returns a Future object
... jobs = [executor.submit(my_function, *args) for args in inputs]
... # future objects will contain results when they are done
... results = [job.result() for job in as_completed(jobs)]
... return results
>>> # The same code tests our method in serial, thread, or process mode
>>> import timerit
>>> ti = timerit.Timerit(100, bestof=10, verbose=2)
>>> # Setup test data
>>> import random
>>> rng = random.Random(0)
>>> max_workers = 4
>>> inputs = [(rng.random(), rng.random()) for _ in range(100)]
>>> for mode in ['serial', 'process', 'thread']:
>>> for timer in ti.reset('mode={} max_workers={}'.format(mode, max_workers)):
>>> with timer:
>>> run_process(inputs, mode=mode, max_workers=max_workers)
>>> print(ub.repr2(ti))
- class ubelt.util_futures.Executor(mode='thread', max_workers=0)[source]¶
Bases:
object
A concrete asynchronous executor with a configurable backend.
The type of parallelism (or lack thereof) is configured via the
mode
parameter, which can be: “process”, “thread”, or “serial”. This allows the user to easily enable / disable parallelism or switch between processes and threads without modifying the surrounding logic.- SeeAlso:
In the case where you cant or dont want to use ubelt.Executor you can get similar behavior with the following pure-python snippet:
def Executor(max_workers): # Stdlib-only "ubelt.Executor"-like behavior if max_workers == 1: import contextlib def submit_partial(func, *args, **kwargs): def wrapper(): return func(*args, **kwargs) wrapper.result = wrapper return wrapper executor = contextlib.nullcontext() executor.submit = submit_partial else: from concurrent.futures import ThreadPoolExecutor executor = ThreadPoolExecutor(max_workers=max_workers) return executor executor = Executor(0) with executor: jobs = [] for arg in range(1000): job = executor.submit(chr, arg) jobs.append(job) results = [] for job in jobs: result = job.result() results.append(result) print('results = {}'.format(ub.urepr(results, nl=1)))
- Variables:
backend (SerialExecutor | ThreadPoolExecutor | ProcessPoolExecutor)
Example
>>> import ubelt as ub >>> # Prototype code using simple serial processing >>> executor = ub.Executor(mode='serial', max_workers=0) >>> jobs = [executor.submit(sum, [i + 1, i]) for i in range(10)] >>> print([job.result() for job in jobs]) [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
>>> # Enable parallelism by only changing one parameter >>> executor = ub.Executor(mode='process', max_workers=0) >>> jobs = [executor.submit(sum, [i + 1, i]) for i in range(10)] >>> print([job.result() for job in jobs]) [1, 3, 5, 7, 9, 11, 13, 15, 17, 19]
- Parameters:
mode (str) – The backend parallelism mechanism. Can be either thread, serial, or process. Defaults to ‘thread’.
max_workers (int) – number of workers. If 0, serial is forced. Defaults to 0.
- submit(func, *args, **kw)[source]¶
Calls the submit function of the underlying backend.
- Returns:
a future representing the job
- Return type:
- map(fn, *iterables, **kwargs)[source]¶
Calls the map function of the underlying backend.
CommandLine
xdoctest -m ubelt.util_futures Executor.map
Example
>>> import ubelt as ub >>> import concurrent.futures >>> import string >>> with ub.Executor(mode='serial') as executor: ... result_iter = executor.map(int, string.digits) ... results = list(result_iter) >>> print('results = {!r}'.format(results)) results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> with ub.Executor(mode='thread', max_workers=2) as executor: ... result_iter = executor.map(int, string.digits) ... results = list(result_iter) >>> # xdoctest: +IGNORE_WANT >>> print('results = {!r}'.format(results)) results = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- class ubelt.util_futures.JobPool(mode='thread', max_workers=0, transient=False)[source]¶
Bases:
object
Abstracts away boilerplate of submitting and collecting jobs
This is a basic wrapper around
ubelt.util_futures.Executor
that simplifies the most basic case by 1. keeping track of references to submitted futures for you and 2. providing an as_completed method to consume those futures as they are ready.- Variables:
executor (Executor) – internal executor object
jobs (List[Future]) – internal job list. Note: do not rely on this attribute, it may change in the future.
Example
>>> import ubelt as ub >>> def worker(data): >>> return data + 1 >>> pool = ub.JobPool('thread', max_workers=16) >>> for data in ub.ProgIter(range(10), desc='submit jobs'): >>> pool.submit(worker, data) >>> final = [] >>> for job in pool.as_completed(desc='collect jobs'): >>> info = job.result() >>> final.append(info) >>> print('final = {!r}'.format(final))
- Parameters:
mode (str) – The backend parallelism mechanism. Can be either thread, serial, or process. Defaults to ‘thread’.
max_workers (int) – number of workers. If 0, serial is forced. Defaults to 0.
transient (bool) – if True, references to jobs will be discarded as they are returned by
as_completed()
. Otherwise thejobs
attribute holds a reference to all jobs ever submitted. Default to False.
- submit(func, *args, **kwargs)[source]¶
Submit a job managed by the pool
- Parameters:
func (Callable[…, Any]) – A callable that will take as many arguments as there are passed iterables.
*args – positional arguments to pass to the function
*kwargs – keyword arguments to pass to the function
- Returns:
a future representing the job
- Return type:
- as_completed(timeout=None, desc=None, progkw=None)[source]¶
Generates completed jobs in an arbitrary order
- Parameters:
timeout (float | None) – Specify the the maximum number of seconds to wait for a job. Note: this is ignored in serial mode.
desc (str | None) – if specified, reports progress with a
ubelt.progiter.ProgIter
object.progkw (dict | None) – extra keyword arguments to
ubelt.progiter.ProgIter
.
- Yields:
concurrent.futures.Future – The completed future object containing the results of a job.
CommandLine
xdoctest -m ubelt.util_futures JobPool.as_completed
Example
>>> import ubelt as ub >>> pool = ub.JobPool('thread', max_workers=8) >>> text = ub.paragraph( ... ''' ... UDP is a cool protocol, check out the wiki: ... ... UDP-based Data Transfer Protocol (UDT), is a high-performance ... data transfer protocol designed for transferring large ... volumetric datasets over high-speed wide area networks. Such ... settings are typically disadvantageous for the more common TCP ... protocol. ... ''') >>> for word in text.split(' '): ... pool.submit(print, word) >>> for _ in pool.as_completed(): ... pass >>> pool.shutdown()
- join(**kwargs)[source]¶
Like
JobPool.as_completed()
, but executes the result method of each future and returns only after all processes are complete. This allows for lower-boilerplate prototyping.- Parameters:
**kwargs – passed to
JobPool.as_completed()
- Returns:
list of results
- Return type:
List[Any]
Example
>>> import ubelt as ub >>> # We just want to try replacing our simple iterative algorithm >>> # with the embarrassingly parallel version >>> arglist = list(zip(range(1000), range(1000))) >>> func = ub.identity >>> # >>> # Original version >>> for args in arglist: >>> func(*args) >>> # >>> # Potentially parallel version >>> jobs = ub.JobPool(max_workers=0) >>> for args in arglist: >>> jobs.submit(func, *args) >>> _ = jobs.join(desc='running')