Source code for ubelt.util_download_manager

"""
A simple download manager
"""

from __future__ import annotations

import typing

if typing.TYPE_CHECKING:
    import concurrent.futures
    import os
    from collections.abc import Iterable
    import ubelt as ub

__all__ = ['DownloadManager']


[docs] class DownloadManager: """ Simple implementation of the download manager Example: >>> # xdoctest: +REQUIRES(--network) >>> import ubelt as ub >>> # Download a file with a known hash >>> manager = ub.DownloadManager() >>> job = manager.submit( >>> 'http://i.imgur.com/rqwaDag.png', >>> hash_prefix='31a129618c87dd667103e7154182e3c39a605eefe90f84f2283f3c87efee8e40' >>> ) >>> fpath = job.result() >>> print('fpath = {!r}'.format(fpath)) Example: >>> # Does not require network >>> import ubelt as ub >>> manager = ub.DownloadManager() >>> for i in range(100): ... job = manager.submit('localhost/might-not-exist-i-{}'.format(i)) >>> file_paths = [] >>> for job in manager.as_completed(prog=True): ... try: ... fpath = job.result() ... file_paths += [fpath] ... except Exception: ... pass >>> print('file_paths = {!r}'.format(file_paths)) Example: >>> # xdoctest: +REQUIRES(--network) >>> import pytest >>> import ubelt as ub >>> manager = ub.DownloadManager() >>> item1 = { >>> 'url': 'https://data.kitware.com/api/v1/item/5b4039308d777f2e6225994c/download', >>> 'dst': 'forgot_what_the_name_really_is', >>> 'hash_prefix': 'c98a46cb31205cf', >>> 'hasher': 'sha512', >>> } >>> item2 = { >>> 'url': 'http://i.imgur.com/rqwaDag.png', >>> 'hash_prefix': 'f79ea24571da6ddd2ba12e3d57b515249ecb8a35', >>> 'hasher': 'sha1', >>> } >>> item1 = item2 # hack around SSL error >>> manager.submit(**item1) >>> manager.submit(**item2) >>> for job in manager.as_completed(prog=True, verbose=3): >>> fpath = job.result() >>> print('fpath = {!r}'.format(fpath)) """ download_root: str | os.PathLike cache: bool _pool: ub.JobPool[typing.Any] _dl_func: typing.Callable[..., object] def __init__( self, download_root: str | os.PathLike | None = None, mode: str = 'thread', max_workers: int = 0, cache: bool = True, ) -> None: """ Args: download_root (str | PathLike): default download location mode (str): either thread, process, or serial cache (bool): defaults to True max_workers (int | None): maximum concurrent tasks TODO: - [ ] Will likely have to initialize and store some sort of "connection state" objects. """ import ubelt as ub # The download manager is overscoped and doesn't provide enough value # over the simple download function. This is better suited for a # separate package rather than a utility library. A proper download # manager would be multiplexing connections and have many more # efficiency tricks that would bloat a ubelt implementation. ub.schedule_deprecation( modname='ubelt', name='DownloadManager', type='class', migration='Vendor the code if you need it.', deprecate='1.4.1', error='2.0.0', remove='2.1.0', ) if download_root is None: download_root = ub.ensure_app_config_dir('ubelt', 'dlman') self._pool = ub.JobPool(mode=mode, max_workers=max_workers) self.download_root = download_root self.cache = cache if self.cache: self._dl_func = ub.grabdata else: self._dl_func = ub.download
[docs] def submit( self, url: str | os.PathLike, dst: str | None = None, hash_prefix: str | None = None, hasher: str = 'sha256', ) -> concurrent.futures.Future: """ Add a job to the download Queue Args: url (str | PathLike): pointer to the data to download dst (str | None): The relative or absolute path to download to. If unspecified, the destination name is derived from the url. hash_prefix (str | None): If specified, verifies that the hash of the downloaded file starts with this. hasher (str): hashing algorithm to use if hash_prefix is specified. Defaults to ``'sha256'``. Returns: concurrent.futures.Future : a Future object that will point to the downloaded location. """ job = self._pool.submit( self._dl_func, url, fname=dst, dpath=self.download_root, hash_prefix=hash_prefix, hasher=hasher, verbose=0, ) return job
[docs] def as_completed( self, prog: None | bool | type = None, desc: str | None = None, verbose: int = 1, ) -> typing.Iterable[typing.Any]: """ Generate completed jobs as they become available Args: prog (None | bool | type): if True, uses a ub.ProgIter progress bar. Can also be a class with a compatible progiter API. desc (str | None): if specified, reports progress with a :class:`ubelt.progiter.ProgIter` object. verbose (int): verbosity Example: >>> import pytest >>> import ubelt as ub >>> download_root = ub.ensure_app_config_dir('ubelt', 'dlman') >>> manager = ub.DownloadManager(download_root=download_root, >>> cache=False) >>> for i in range(3): >>> manager.submit('localhost') >>> results = list(manager) >>> print('results = {!r}'.format(results)) >>> manager.shutdown() """ if prog is True: import ubelt as ub prog = ub.ProgIter if prog: _iter = self._pool.as_completed() return prog(_iter, total=len(self), desc=desc, verbose=verbose) else: return self._pool.as_completed()
[docs] def shutdown(self) -> None: """ Cancel all jobs and close all connections. """ self._pool.executor.shutdown()
def __iter__(self) -> Iterable: """ Returns: Iterable """ return self.as_completed() def __len__(self) -> int: """ Returns: int """ return len(self._pool)