Source code for pystow.utils.hashing

"""Hashing utilities."""

from __future__ import annotations

import hashlib
import logging
from collections.abc import Collection, Iterable, Mapping
from pathlib import Path
from typing import NamedTuple, TypeAlias
from urllib.request import urlopen

__all__ = [
    "Hash",
    "HexDigestError",
    "HexDigestMismatch",
    "get_hash_hexdigest",
    "get_hashes",
    "get_hexdigests_remote",
    "get_offending_hexdigests",
    "raise_on_digest_mismatch",
]

logger = logging.getLogger(__name__)

#: This type alias uses a stub-only constructor, meaning that
#: hashlib._Hash isn't actually part of the code, but MyPy injects it
#: so we can do type checking
Hash: TypeAlias = "hashlib._Hash"


[docs] def get_hexdigests_remote( hexdigests_remote: Mapping[str, str] | None, hexdigests_strict: bool = False ) -> Mapping[str, str]: """Process hexdigests via URLs. :param hexdigests_remote: The expected hexdigests as (algorithm_name, url to file with expected hex digest) pairs. :param hexdigests_strict: Set this to `False` to stop automatically checking for the `algorithm(filename)=hash` format :returns: A mapping of algorithms to hexdigests """ rv = {} for key, url in (hexdigests_remote or {}).items(): with urlopen(url) as response: # noqa:S310 text = response.read().decode("utf-8") if not hexdigests_strict and "=" in text: text = text.rsplit("=", 1)[-1].strip() rv[key] = text return rv
[docs] class HexDigestMismatch(NamedTuple): """Contains information about a hexdigest mismatch.""" #: the name of the algorithm name: str #: the observed/actual hexdigest, encoded as a string actual: str #: the expected hexdigest, encoded as a string expected: str
[docs] def get_offending_hexdigests( path: str | Path, chunk_size: int | None = None, hexdigests: Mapping[str, str] | None = None, hexdigests_remote: Mapping[str, str] | None = None, hexdigests_strict: bool = False, ) -> Collection[HexDigestMismatch]: """Check a file for hash sums. :param path: The file path. :param chunk_size: The chunk size for reading the file. :param hexdigests: The expected hexdigests as (algorithm_name, expected_hex_digest) pairs. :param hexdigests_remote: The expected hexdigests as (algorithm_name, url to file with expected hexdigest) pairs. :param hexdigests_strict: Set this to false to stop automatically checking for the `algorithm(filename)=hash` format :returns: A collection of observed / expected hexdigests where the digests do not match. """ hexdigests = dict( **(hexdigests or {}), **get_hexdigests_remote(hexdigests_remote, hexdigests_strict=hexdigests_strict), ) # If there aren't any keys in the combine dictionaries, # then there won't be any mismatches if not hexdigests: return [] logger.info(f"Checking hash sums for file: {path}") # instantiate algorithms algorithms = get_hashes(path=path, names=set(hexdigests), chunk_size=chunk_size) # Compare digests mismatches = [] for alg, expected_digest in hexdigests.items(): observed_digest = algorithms[alg].hexdigest() if observed_digest != expected_digest: logger.error(f"{alg} expected {expected_digest} but got {observed_digest}.") mismatches.append(HexDigestMismatch(alg, observed_digest, expected_digest)) else: logger.debug(f"Successfully checked with {alg}.") return mismatches
[docs] def get_hashes( path: str | Path, names: Iterable[str], *, chunk_size: int | None = None, ) -> Mapping[str, Hash]: """Calculate several hexdigests of hash algorithms for a file concurrently. :param path: The file path. :param names: Names of the hash algorithms in :mod:`hashlib` :param chunk_size: The chunk size for reading the file. :returns: A collection of observed hexdigests """ path = Path(path).resolve() if chunk_size is None: chunk_size = 64 * 2**10 # instantiate hash algorithms algorithms: Mapping[str, Hash] = {name: hashlib.new(name) for name in names} # calculate hash sums of file incrementally buffer = memoryview(bytearray(chunk_size)) with path.open("rb", buffering=0) as file: for this_chunk_size in iter(lambda: file.readinto(buffer), 0): for alg in algorithms.values(): alg.update(buffer[:this_chunk_size]) return algorithms
[docs] def get_hash_hexdigest( path: str | Path, name: str, *, chunk_size: int | None = None, ) -> str: """Get a hash digest for a single hash.""" r = get_hashes(path, [name], chunk_size=chunk_size) return r[name].hexdigest()
[docs] def raise_on_digest_mismatch( *, path: Path, hexdigests: Mapping[str, str] | None = None, hexdigests_remote: Mapping[str, str] | None = None, hexdigests_strict: bool = False, ) -> None: """Raise a HexDigestError if the digests do not match. :param path: The file path. :param hexdigests: The expected hexdigests as (algorithm_name, expected_hex_digest) pairs. :param hexdigests_remote: The expected hexdigests as (algorithm_name, url to file with expected hexdigest) pairs. :param hexdigests_strict: Set this to false to stop automatically checking for the `algorithm(filename)=hash` format :raises HexDigestError: if there are any offending hex digests The expected hexdigests as (algorithm_name, url to file with expected hexdigest) pairs. """ offending_hexdigests = get_offending_hexdigests( path=path, hexdigests=hexdigests, hexdigests_remote=hexdigests_remote, hexdigests_strict=hexdigests_strict, ) if offending_hexdigests: raise HexDigestError(offending_hexdigests)
[docs] class HexDigestError(ValueError): """Thrown if the hashsums do not match expected hashsums.""" def __init__(self, offending_hexdigests: Collection[HexDigestMismatch]): """Instantiate the exception. :param offending_hexdigests: The result from :func:`get_offending_hexdigests` """ self.offending_hexdigests = offending_hexdigests def __str__(self) -> str: return "\n".join( ( "Hexdigest of downloaded file does not match the expected ones!", *( f"\t{name} actual: {actual} vs. expected: {expected}" for name, actual, expected in self.offending_hexdigests ), ) )