Source code for pystow.utils.iter

"""Utilities for functions that consume iterables."""

from __future__ import annotations

from typing import TYPE_CHECKING, Concatenate, ParamSpec, TypeVar

if TYPE_CHECKING:
    from collections.abc import Callable, Generator, Iterable

    X = TypeVar("X")
    P = ParamSpec("P")


__all__ = [
    "reyield",
]


class Sentinel:
    """A sentinel class."""


_SENTINEL = Sentinel()


[docs] def reyield( func: Callable[Concatenate[Iterable[X], P], None], elements: Iterable[X], *args: P.args, **kwargs: P.kwargs, ) -> Generator[X, None, None]: """Make a function that consumes an iterable yield its elements. :param func: A function that consumes an iterable, and does not return anything :param elements: An iterable :returns: A generator that injects the elements of the iterable one at a time before yielding them. >>> state = {"value": 0} >>> def accumulate(elements: Iterable[int]) -> None: ... for element in elements: ... state["value"] += element >>> square_sum = 0 >>> for element in reyield(accumulate, range(5)): ... square_sum += element**2 The goal of this function is not to need to keep elements in memory, so this should be an iterable-native alternative to just doing: >>> elements = range(5) >>> elements = list(elements) >>> accumulate(elements) >>> square_sum = 0 >>> for element in elements: ... square_sum += element**2 """ from contextlib import closing with closing(_help_reiter(func, *args, **kwargs)) as generator: next(generator) # prime the coroutine for element in elements: generator.send(element) yield element
def _help_reiter( func: Callable[Concatenate[Iterable[X], P], None], *args: P.args, **kwargs: P.kwargs ) -> Generator[None, X, None]: """Modify the function.""" import threading from queue import Queue queue: Queue[X | Sentinel] = Queue() def _iterable_from_queue() -> Generator[X, None, None]: while True: # queue.get() blocks indefinitely (because timeout is None) # until the queue receives something item_ = queue.get(block=True, timeout=None) # if the queue receives a sentinel value, then we break out of the # while loop, which will cause the generator to raise a GeneratorExit if isinstance(item_, Sentinel): break yield item_ # Run the consumer in a background thread, fed by the generator that wraps the queue thread = threading.Thread( target=func, args=(_iterable_from_queue(), *args), kwargs=kwargs, daemon=True ) thread.start() # now, we invert the generator pattern - this function # will return a generator that you can .send(item) to, # and the yield statement waits for the value and sticks # it in item try: while True: item: X = yield # pause, wait for .send(item) queue.put(item) except GeneratorExit: # this happens when the outer generator ends, which means # it's time to send the queue a "final" value and block # on finishing the thread to clean up pass finally: queue.put(_SENTINEL) # signal the consumer that we're done thread.join()