Source code for ubelt.util_hash

# -*- coding: utf-8 -*-
"""
Wrappers around hashlib functions to generate hash signatures for common data.

The hashes should be determenistic across platforms.

NOTE:
    The exact hashes generated for data object and files may change in the
    future. When this happens the `HASH_VERSION` attribute will be incremented.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import hashlib
import six
import uuid
import math
from collections import OrderedDict
from six.moves import zip
# we will use NoParam instead of None because None is a valid hashlen setting
from ubelt.util_const import NoParam

__all__ = ['hash_data', 'hash_file']

HASH_VERSION = 1  # incremented when we make a change that modifies hashes

_ALPHABET_10 = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']

_ALPHABET_16 = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
                'a', 'b', 'c', 'd', 'e', 'f']

_ALPHABET_26 = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
                'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't',
                'u', 'v', 'w', 'x', 'y', 'z']

if six.PY2:
    _stringlike = (basestring, bytes)  # NOQA
    _intlike = (int, long)  # NOQA
else:
    _stringlike = (str, bytes)  # NOQA
    _intlike = (int,)

# Default to 512 because it is often faster than 256 on 64bit systems:
# Reference: https://crypto.stackexchange.com/questions/26336/faster
DEFAULT_ALPHABET = _ALPHABET_26
DEFAULT_HASHER = hashlib.sha512  # note: using sha1 is a bit faster
DEFAULT_HASHLEN = None


if six.PY2:
    import codecs
    HASH = type(hashlib.sha1())  # python2 doesn't expose the hash type

    def _py2_to_bytes(int_, length, byteorder='big', signed=True):
        """
        Args:
            length (int) : number of bytes (not bits)

        References:
            https://bugs.python.org/issue16580
        """
        # convert nbytes to nbits
        bit_width = length * 8
        if int_ < 0:
            complement = (1 << bit_width) + int_
        else:
            complement = int_
        h = '%x' % complement
        p = ('0' * (len(h) % 2) + h).zfill(length * 2)
        s = p.decode('hex')
        bytes_ = s if byteorder == 'big' else s[::-1]
        return bytes_

    def _int_to_bytes(int_):
        bit_length = int_.bit_length() + 1
        length = int(math.ceil(bit_length / 8.0))  # bytelength
        bytes_ = _py2_to_bytes(int_, length, byteorder='big', signed=True)
        return bytes_

    def _bytes_to_int(bytes_):
        nbytes = len(bytes_)
        nbits = nbytes * 8
        comp = int(codecs.encode(bytes_, 'hex'), 16)
        if comp > 1 << (nbits - 1):
            int_ = comp - (1 << (nbits))
        else:
            int_ = comp
        return int_
else:
    HASH = hashlib._hashlib.HASH
    codecs = None
    def _int_to_bytes(int_):
        r"""
        Converts an integer into its byte representation
        assumes int32 by default, but dynamically handles larger ints

        Example:
            >>> from ubelt.util_hash import _int_to_bytes, _bytes_to_int
            >>> int_ = 1
            >>> assert _bytes_to_int((_int_to_bytes(int_))) == int_
            >>> assert _int_to_bytes(int_) == b'\x01'
            >>> assert _bytes_to_int((_int_to_bytes(0))) == 0
            >>> assert _bytes_to_int((_int_to_bytes(-1))) == -1
            >>> assert _bytes_to_int((_int_to_bytes(-1000000))) == -1000000
            >>> assert _bytes_to_int((_int_to_bytes(1000000))) == 1000000
        """
        bit_length = int_.bit_length() + 1
        length = math.ceil(bit_length / 8.0)  # bytelength
        bytes_ = int_.to_bytes(length, byteorder='big', signed=True)
        return bytes_

    def _bytes_to_int(bytes_):
        r"""
        Converts a string of bytes into its integer representation (big-endian)

        Example:
            >>> bytes_ = b'\x01'
            >>> assert _int_to_bytes((_bytes_to_int(bytes_))) == bytes_
            >>> assert _bytes_to_int(bytes_) == 1
        """
        int_ = int.from_bytes(bytes_, 'big', signed=True)
        return int_


def _rectify_hasher(hasher):
    """
    Convert a string-based key into a hasher class

    Example:
        >>> assert _rectify_hasher(NoParam) is DEFAULT_HASHER
        >>> assert _rectify_hasher('sha1') is hashlib.sha1
        >>> assert _rectify_hasher('sha256') is hashlib.sha256
        >>> assert _rectify_hasher('sha512') is hashlib.sha512
        >>> assert _rectify_hasher('md5') is hashlib.md5
        >>> assert _rectify_hasher(hashlib.sha1) is hashlib.sha1
        >>> assert _rectify_hasher(hashlib.sha1())().name == 'sha1'
        >>> import pytest
        >>> assert pytest.raises(KeyError, _rectify_hasher, '42')
        >>> #assert pytest.raises(TypeError, _rectify_hasher, object)
    """
    if hasher is NoParam or hasher == 'default':
        hasher = DEFAULT_HASHER
    elif isinstance(hasher, six.string_types):
        if hasher not in hashlib.algorithms_available:
            raise KeyError('unknown hasher: {}'.format(hasher))
        else:
            hasher = getattr(hashlib, hasher)
    elif isinstance(hasher, HASH):
        # by default the result of this function is a class we will make an
        # instance of, if we already have an instance, wrap it in a callable
        # so the external syntax does not need to change.
        return lambda: hasher
    return hasher


def _rectify_base(base):
    """
    transforms base shorthand into the full list representation

    Example:
        >>> assert _rectify_base(NoParam) is _ALPHABET_26
        >>> assert _rectify_base('hex') is _ALPHABET_16
        >>> assert _rectify_base('abc') is _ALPHABET_26
        >>> assert _rectify_base(10) is _ALPHABET_10
        >>> assert _rectify_base(['1', '2']) == ['1', '2']
        >>> import pytest
        >>> assert pytest.raises(TypeError, _rectify_base, 'uselist')
    """
    if base is NoParam or base == 'default':
        return DEFAULT_ALPHABET
    elif base in [26, 'alpha', 'abc']:
        return _ALPHABET_26
    elif base in [16, 'hex']:
        return _ALPHABET_16
    elif base in [10, 'dec']:
        return _ALPHABET_10
    else:
        if not isinstance(base, (list, tuple)):
            raise TypeError(
                    'base must be a key, list, or tuple not {}'.format(
                        type(base)))
        return base


def _rectify_hashlen(hashlen):
    """
    Example:
        >>> assert _rectify_hashlen(NoParam) is DEFAULT_HASHLEN
        >>> assert _rectify_hashlen(8) == 8
    """
    if hashlen is NoParam or hashlen == 'default':
        return DEFAULT_HASHLEN
    else:
        return hashlen


class HashableExtensions(object):
    """
    Singleton helper class for managing non-builtin (e.g. numpy) hash types
    """
    def __init__(self):
        self.keyed_extensions = {}
        self.iterable_checks = []

    def register(self, hash_types):
        # ensure iterable
        if not isinstance(hash_types, (list, tuple)):
            hash_types = [hash_types]
        def _wrap(hash_func):
            for hash_type in hash_types:
                key = (hash_type.__module__, hash_type.__name__)
                self.keyed_extensions[key] = (hash_type, hash_func)
            return hash_func
        return _wrap

    def add_iterable_check(self, func):
        """
        Registers a function that detects when a type is iterable
        """
        self.iterable_checks.append(func)
        return func

    def lookup(self, data):
        """
        Returns an appropriate function to hash `data` if one has been
        registered.

        Raises:
            TypeError : if data has no registered hash methods

        Example:
            >>> self = HashableExtensions()
            >>> self._register_numpy_extensions()
            >>> self._register_builtin_class_extensions()

            >>> data = np.array([1, 2, 3])
            >>> self.lookup(data[0])

            >>> class Foo(object):
            >>>     def __init__(f):
            >>>         f.attr = 1
            >>> data = Foo()
            >>> import pytest
            >>> assert pytest.raises(TypeError, self.lookup, data)

            >>> # If ub.hash_data doesnt support your object,
            >>> # then you can register it.
            >>> @self.register(Foo)
            >>> def _hashfoo(data):
            >>>     return b'FOO', data.attr
            >>> func = self.lookup(data)
            >>> assert func(data)[1] == 1

            >>> data = uuid.uuid4()
            >>> self.lookup(data)
        """
        # Maybe try using functools.singledispatch instead?
        # First try fast O(1) lookup
        query_hash_type = data.__class__
        key = (query_hash_type.__module__, query_hash_type.__name__)
        try:
            hash_type, hash_func = self.keyed_extensions[key]
        except KeyError:
            raise TypeError('No registered hash func for hashable type=%r' % (
                    query_hash_type))
        return hash_func

    def _register_numpy_extensions(self):
        """
        Numpy extensions are builtin
        """
        # system checks
        numpy_floating_types = (np.float16, np.float32, np.float64)
        if hasattr(np, 'float128'):  # nocover
            numpy_floating_types = numpy_floating_types + (np.float128,)

        @self.add_iterable_check
        def is_object_ndarray(data):
            # ndarrays of objects cannot be hashed directly.
            return isinstance(data, np.ndarray) and data.dtype.kind == 'O'

        @self.register(np.ndarray)
        def hash_numpy_array(data):
            """
            Example:
                >>> data_f32 = np.zeros((3, 3, 3), dtype=np.float64)
                >>> data_i64 = np.zeros((3, 3, 3), dtype=np.int64)
                >>> data_i32 = np.zeros((3, 3, 3), dtype=np.int32)
                >>> hash_f64 = _hashable_sequence(data_f32, use_prefix=True)
                >>> hash_i64 = _hashable_sequence(data_i64, use_prefix=True)
                >>> hash_i32 = _hashable_sequence(data_i64, use_prefix=True)
                >>> assert hash_i64 != hash_f64
                >>> assert hash_i64 != hash_i32
            """
            if data.dtype.kind == 'O':
                msg = 'directly hashing ndarrays with dtype=object is unstable'
                raise TypeError(msg)
            else:
                # tobytes() views the array in 1D (via ravel())
                # encode the shape as well
                header = b''.join(_hashable_sequence((len(data.shape), data.shape)))
                dtype = b''.join(_hashable_sequence(data.dtype.descr))
                hashable = header + dtype + data.tobytes()
            prefix = b'NDARR'
            return prefix, hashable

        @self.register((np.int64, np.int32, np.int16, np.int8) +
                       (np.uint64, np.uint32, np.uint16, np.uint8))
        def _hash_numpy_int(data):
            return _convert_to_hashable(int(data))

        @self.register(numpy_floating_types)
        def _hash_numpy_float(data):
            return _convert_to_hashable(float(data))

        @self.register(np.random.RandomState)
        def _hash_numpy_random_state(data):
            """
            Example:
                >>> rng = np.random.RandomState(0)
                >>> _hashable_sequence(rng, use_prefix=True)
            """
            hashable = b''.join(_hashable_sequence(data.get_state()))
            prefix = b'RNG'
            return prefix, hashable

    def _register_builtin_class_extensions(self):
        """
        Register hashing extensions for a selection of classes included in
        python stdlib.

        Example:
            >>> data = uuid.UUID('7e9d206b-dc02-4240-8bdb-fffe858121d0')
            >>> print(hash_data(data)[0:8])
            cryarepd
            >>> data = OrderedDict([('a', 1), ('b', 2), ('c', [1, 2, 3]),
            >>>                     (4, OrderedDict())])
            >>> print(hash_data(data)[0:8])
            qjspicvv

            gpxtclct
        """
        @self.register(uuid.UUID)
        def _hash_uuid(data):
            hashable = data.bytes
            prefix = b'UUID'
            return prefix, hashable

        @self.register(OrderedDict)
        def _hash_ordered_dict(data):
            """
            Note, we should not be hashing dicts because they are unordered
            """
            hashable = b''.join(_hashable_sequence(list(data.items())))
            prefix = b'ODICT'
            return prefix, hashable

_HASHABLE_EXTENSIONS = HashableExtensions()
_HASHABLE_EXTENSIONS._register_builtin_class_extensions()
try:
    import numpy as np
    _HASHABLE_EXTENSIONS._register_numpy_extensions()
except ImportError:  # nocover
    pass


class _HashTracer(object):
    """ helper class to extract hashed sequences """

    def __init__(self):
        self.sequence = []

    def update(self, bytes):
        self.sequence.append(bytes)


def _hashable_sequence(data, use_prefix=True):
    r"""
    Extracts the sequence of bytes that would be hashed by hash_data

    Example:
        >>> data = [2, (3, 4)]
        >>> result1 = (b''.join(_hashable_sequence(data, use_prefix=False)))
        >>> result2 = (b''.join(_hashable_sequence(data, use_prefix=True)))
        >>> assert result1 == b'_[_\x02_,__[_\x03_,_\x04_,__]__]_'
        >>> assert result2 == b'_[_INT\x02_,__[_INT\x03_,_INT\x04_,__]__]_'
    """
    hasher = _HashTracer()
    _update_hasher(hasher, data, use_prefix=use_prefix)
    return hasher.sequence


def _convert_to_hashable(data, use_prefix=True):
    r"""
    Converts `data` into a hashable byte representation if an appropriate
    hashing function is known.

    Args:
        data (object): ordered data with structure
        use_prefix (bool): include type prefixes in the hash

    Returns:
        tuple(bytes, bytes): prefix, hashable:
            a prefix hinting the original data type and the byte representation
            of `data`.

    Raises:
        TypeError : if data has no registered hash methods

    Example:
        >>> assert _convert_to_hashable(None) == (b'NULL', b'NONE')
        >>> assert _convert_to_hashable('string') == (b'TXT', b'string')
        >>> assert _convert_to_hashable(1) == (b'INT', b'\x01')
        >>> assert _convert_to_hashable(1.0) == (b'FLT', b'\x01/\x01')
        >>> assert _convert_to_hashable(_intlike[-1](1)) == (b'INT', b'\x01')
    """
    # HANDLE MOST COMMON TYPES FIRST
    if data is None:
        hashable = b'NONE'
        prefix = b'NULL'
    elif isinstance(data, six.binary_type):
        hashable = data
        prefix = b'TXT'
    elif isinstance(data, six.text_type):
        # convert unicode into bytes
        hashable = data.encode('utf-8')
        prefix = b'TXT'
    elif isinstance(data, _intlike):
        # warnings.warn('Hashing ints is slow, numpy is prefered')
        hashable = _int_to_bytes(data)
        # hashable = data.to_bytes(8, byteorder='big')
        prefix = b'INT'
    elif isinstance(data, float):
        a, b = float(data).as_integer_ratio()
        hashable = _int_to_bytes(a) + b'/' +  _int_to_bytes(b)
        prefix = b'FLT'
    else:
        # Then dynamically look up any other type
        hash_func = _HASHABLE_EXTENSIONS.lookup(data)
        prefix, hashable = hash_func(data)
    if use_prefix:
        return prefix, hashable
    else:
        return b'', hashable


def _update_hasher(hasher, data, use_prefix=True):
    """
    Converts `data` into a byte representation and calls update on the hasher
    `hashlib.HASH` algorithm.

    Args:
        hasher (HASH): instance of a hashlib algorithm
        data (object): ordered data with structure
        use_prefix (bool): include type prefixes in the hash

    Example:
        >>> hasher = hashlib.sha512()
        >>> data = [1, 2, ['a', 2, 'c']]
        >>> _update_hasher(hasher, data)
        >>> print(hasher.hexdigest()[0:8])
        e2c67675

        2ba8d82b
    """
    # Determine if the data should be hashed directly or iterated through
    if isinstance(data, (tuple, list, zip)):
        needs_iteration = True
    else:
        needs_iteration = any(check(data) for check in
                              _HASHABLE_EXTENSIONS.iterable_checks)

    if needs_iteration:
        # Denote that we are hashing over an iterable
        # Multiple structure bytes makes it harder accidently make conflicts
        SEP = b'_,_'
        ITER_PREFIX = b'_[_'
        ITER_SUFFIX = b'_]_'

        iter_ = iter(data)
        hasher.update(ITER_PREFIX)
        # first, try to nest quickly without recursive calls
        # (this works if all data in the sequence is a non-iterable)
        try:
            for item in iter_:
                prefix, hashable = _convert_to_hashable(item, use_prefix)
                binary_data = prefix + hashable + SEP
                hasher.update(binary_data)
        except TypeError:
            # need to use recursive calls
            # Update based on current item
            _update_hasher(hasher, item, use_prefix)
            for item in iter_:
                # Ensure the items have a spacer between them
                _update_hasher(hasher, item, use_prefix)
                hasher.update(SEP)
        hasher.update(ITER_SUFFIX)
    else:
        prefix, hashable = _convert_to_hashable(data, use_prefix)
        binary_data = prefix + hashable
        hasher.update(binary_data)


def _convert_hexstr_base(hexstr, base):
    r"""
    Packs a long hexstr into a shorter length string with a larger base.

    Args:
        hexstr (str): string of hexidecimal symbols to convert
        base (list): symbols of the conversion base

    Example:
        >>> print(_convert_hexstr_base('ffffffff', _ALPHABET_26))
        nxmrlxv
        >>> print(_convert_hexstr_base('0', _ALPHABET_26))
        0
        >>> print(_convert_hexstr_base('-ffffffff', _ALPHABET_26))
        -nxmrlxv
        >>> print(_convert_hexstr_base('aafffff1', _ALPHABET_16))
        aafffff1

    Sympy:
        >>> import sympy as sy
        >>> # Determine the length savings with lossless conversion
        >>> consts = dict(hexbase=16, hexlen=256, baselen=27)
        >>> symbols = sy.symbols('hexbase, hexlen, baselen, newlen')
        >>> haexbase, hexlen, baselen, newlen = symbols
        >>> eqn = sy.Eq(16 ** hexlen,  baselen ** newlen)
        >>> newlen_ans = sy.solve(eqn, newlen)[0].subs(consts).evalf()
        >>> print('newlen_ans = %r' % (newlen_ans,))
        >>> # for a 26 char base we can get 216
        >>> print('Required length for lossless conversion len2 = %r' % (len2,))
        >>> def info(base, len):
        ...     bits = base ** len
        ...     print('base = %r' % (base,))
        ...     print('len = %r' % (len,))
        ...     print('bits = %r' % (bits,))
        >>> info(16, 256)
        >>> info(27, 16)
        >>> info(27, 64)
        >>> info(27, 216)
    """
    if base is _ALPHABET_16:
        # already in hex, no conversion needed
        return hexstr
    baselen = len(base)
    x = int(hexstr, 16)  # first convert to base 16
    if x == 0:
        return '0'
    sign = 1 if x > 0 else -1
    x *= sign
    digits = []
    while x:
        digits.append(base[x % baselen])
        x //= baselen
    if sign < 0:
        digits.append('-')
    digits.reverse()
    newbase_str = ''.join(digits)
    return newbase_str


def _digest_hasher(hasher, hashlen, base):
    """ counterpart to _update_hasher """
    # Get a 128 character hex string
    hex_text = hasher.hexdigest()
    # Shorten length of string (by increasing base)
    base_text = _convert_hexstr_base(hex_text, base)
    # Truncate
    text = base_text[:hashlen]
    return text


[docs]def hash_data(data, hasher=NoParam, hashlen=NoParam, base=NoParam): r""" Get a unique hash depending on the state of the data. Args: data (object): any sort of loosely organized data hasher (HASH): hash algorithm from hashlib, defaults to `sha512`. hashlen (int): maximum number of symbols in the returned hash. If not specified, all are returned. base (list): list of symbols or shorthand key. Defaults to base 26 Returns: str: text - hash string Example: >>> print(hash_data([1, 2, (3, '4')], hashlen=8, hasher='sha512')) iugjngof frqkjbsq """ base = _rectify_base(base) hashlen = _rectify_hashlen(hashlen) hasher = _rectify_hasher(hasher)() # Feed the data into the hasher _update_hasher(hasher, data) # Get the hashed representation text = _digest_hasher(hasher, hashlen, base) return text
[docs]def hash_file(fpath, blocksize=65536, stride=1, hasher=NoParam, hashlen=NoParam, base=NoParam): r""" Hashes the data in a file on disk. Args: fpath (str): file path string blocksize (int): 2 ** 16. Affects speed of reading file stride (int): strides > 1 skip data to hash, useful for faster hashing, but less accurate, also makes hash dependant on blocksize. hasher (HASH): hash algorithm from hashlib, defaults to `sha512`. hashlen (int): maximum number of symbols in the returned hash. If not specified, all are returned. base (list): list of symbols or shorthand key. Defaults to base 26 Notes: For better hashes keep stride = 1 For faster hashes set stride > 1 blocksize matters when stride > 1 References: http://stackoverflow.com/questions/3431825/md5-checksum-of-a-file http://stackoverflow.com/questions/5001893/when-to-use-sha-1-vs-sha-2 Example: >>> import ubelt as ub >>> from os.path import join >>> fpath = join(ub.ensure_app_cache_dir('ubelt'), 'tmp.txt') >>> ub.writeto(fpath, 'foobar') >>> print(ub.hash_file(fpath, hasher='sha512', hashlen=8)) vkiodmcj """ base = _rectify_base(base) hashlen = _rectify_hashlen(hashlen) hasher = _rectify_hasher(hasher)() with open(fpath, 'rb') as file: buf = file.read(blocksize) if stride > 1: # skip blocks when stride is greater than 1 while len(buf) > 0: hasher.update(buf) file.seek(blocksize * (stride - 1), 1) buf = file.read(blocksize) else: # otherwise hash the entire file while len(buf) > 0: hasher.update(buf) buf = file.read(blocksize) # Get the hashed representation text = _digest_hasher(hasher, hashlen, base) return text
if __name__ == '__main__': r""" CommandLine: python -m ubelt.util_hash all """ import xdoctest xdoctest.doctest_module(__file__)