Source code for tools.itertools

"""Iterator functions.

.. note:: ``pairwise()`` function introduced in Python 3.10 is backported
   in :mod:`backports`
"""
#
# (C) Pywikibot team, 2008-2023
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations

import collections
import itertools
from contextlib import suppress
from itertools import chain, zip_longest
from typing import Any

from pywikibot.backports import Generator, batched
from pywikibot.logging import debug
from pywikibot.tools import deprecated


__all__ = (
    'filter_unique',
    'intersect_generators',
    'islice_with_ellipsis',
    'itergroup',
    'roundrobin_generators',
)


[docs] @deprecated('backports.batched()', since='8.2.0') def itergroup(iterable, size: int, strict: bool = False) -> Generator[list[Any], None, None]: """Make an iterator that returns lists of (up to) size items from iterable. Example: >>> i = itergroup(range(25), 10) >>> print(next(i)) [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] >>> print(next(i)) [10, 11, 12, 13, 14, 15, 16, 17, 18, 19] >>> print(next(i)) [20, 21, 22, 23, 24] >>> print(next(i)) Traceback (most recent call last): ... StopIteration .. versionadded:: 7.6 The *strict* parameter. .. deprecated:: 8.2 Use :func:`backports.batched` instead. :param size: How many items of the iterable to get in one chunk :param strict: If True, raise a ValueError if length of iterable is not divisible by `size`. :raises ValueError: iterable is not divisible by size """ for group in batched(iterable, size, strict=strict): yield list(group)
[docs] def islice_with_ellipsis(iterable, *args, marker: str = '…'): """ Generator which yields the first n elements of the iterable. If more elements are available and marker is True, it returns an extra string marker as continuation mark. Function takes the and the additional keyword marker. :param iterable: the iterable to work on :type iterable: iterable :param args: same args as: - ``itertools.islice(iterable, stop)`` - ``itertools.islice(iterable, start, stop[, step])`` :param marker: element to yield if iterable still contains elements after showing the required number. Default value: '…' """ s = slice(*args) _iterable = iter(iterable) yield from itertools.islice(_iterable, *args) if marker and s.stop is not None: with suppress(StopIteration): next(_iterable) yield marker
[docs] def intersect_generators(*iterables, allow_duplicates: bool = False): """Generator of intersect iterables. Yield items only if they are yielded by all iterables. zip_longest is used to retrieve items from all iterables in parallel, so that items can be yielded before iterables are exhausted. Generator is stopped when all iterables are exhausted. Quitting before all iterables are finished is attempted if there is no more chance of finding an item in all of them. Sample: >>> iterables = 'mississippi', 'missouri' >>> list(intersect_generators(*iterables)) ['m', 'i', 's'] >>> list(intersect_generators(*iterables, allow_duplicates=True)) ['m', 'i', 's', 's', 'i'] .. versionadded:: 3.0 .. versionchanged:: 5.0 Avoid duplicates (:phab:`T263947`). .. versionchanged:: 6.4 ``genlist`` was renamed to ``iterables``; consecutive iterables are to be used as iterables parameters or '*' to unpack a list .. versionchanged:: 7.0 Reimplemented without threads which is up to 10'000 times faster .. versionchanged:: 9.0 Iterable elements may consist of lists or tuples ``allow_duplicates`` is a keyword-only argument :param iterables: page generators :param allow_duplicates: optional keyword argument to allow duplicates if present in all generators """ if not iterables: return if len(iterables) == 1: yield from iterables[0] return # If any iterable is empty, no pages are going to be returned for source in iterables: if not source: debug('At least one iterable ({!r}) is empty and execution was ' 'skipped immediately.'.format(source)) return # Item is cached to check that it is found n_gen times # before being yielded. cache = collections.defaultdict(collections.Counter) n_gen = len(iterables) ones = collections.Counter(range(n_gen)) active_iterables = set(range(n_gen)) seen = set() # Get items from iterables in a round-robin way. sentinel = object() for items in zip_longest(*iterables, fillvalue=sentinel): for index, item in enumerate(items): if item is sentinel: active_iterables.discard(index) continue if not allow_duplicates and hash(item) in seen: continue # Each cache entry is a Counter of iterables' index cache[item][index] += 1 if len(cache[item]) == n_gen: yield item # Remove item from cache if possible or decrease Counter entry if not allow_duplicates: del cache[item] seen.add(hash(item)) elif cache[item] == ones: del cache[item] else: cache[item] -= ones # We can quit if an iterable is exceeded and cached iterables is # a subset of active iterables. if len(active_iterables) < n_gen: cached_iterables = set( chain.from_iterable(v.keys() for v in cache.values())) if cached_iterables <= active_iterables: return
[docs] def roundrobin_generators(*iterables) -> Generator[Any, None, None]: """Yield simultaneous from each iterable. Sample: >>> tuple(roundrobin_generators('ABC', range(5))) ('A', 0, 'B', 1, 'C', 2, 3, 4) .. versionadded:: 3.0 .. versionchanged:: 6.4 A sentinel variable is used to determine the end of an iterable instead of None. :param iterables: any iterable to combine in roundrobin way :type iterables: iterable :return: the combined generator of iterables :rtype: generator """ sentinel = object() return (item for item in itertools.chain.from_iterable( zip_longest(*iterables, fillvalue=sentinel)) if item is not sentinel)
[docs] def filter_unique(iterable, container=None, key=None, add=None): """ Yield unique items from an iterable, omitting duplicates. By default, to provide uniqueness, it puts the generated items into a set created as a local variable. It only yields items which are not already present in the local set. For large collections, this is not memory efficient, as a strong reference to every item is kept in a local set which cannot be cleared. Also, the local set can't be re-used when chaining unique operations on multiple generators. To avoid these issues, it is advisable for the caller to provide their own container and set the key parameter to be the function :py:obj:`hash`, or use a :py:obj:`weakref` as the key. The container can be any object that supports __contains__. If the container is a set or dict, the method add or __setitem__ will be used automatically. Any other method may be provided explicitly using the add parameter. Beware that key=id is only useful for cases where id() is not unique. .. warning:: This is not thread safe. .. versionadded:: 3.0 :param iterable: the source iterable :type iterable: collections.abc.Iterable :param container: storage of seen items :type container: type :param key: function to convert the item to a key :type key: callable :param add: function to add an item to the container :type add: callable """ if container is None: container = set() if not add: if hasattr(container, 'add'): def container_add(x) -> None: container.add(key(x) if key else x) add = container_add else: def container_setitem(x) -> None: container.__setitem__(key(x) if key else x, True) add = container_setitem for item in iterable: try: if (key(item) if key else item) not in container: add(item) yield item except StopIteration: return