Source code for wmflib.decorators

"""Decorators module."""
import logging
import time

from dataclasses import dataclass
from datetime import timedelta
from functools import wraps
from typing import Any, Callable, cast, Dict, Optional, Tuple, Type, Union

from wmflib.exceptions import WmflibError


logger = logging.getLogger(__name__)


# TODO: use TypedDict once Python 3.7 support is removed
[docs] @dataclass class RetryParams: """Retry decorator parameters class. This class represents the parameters that the :py:func:`wmflib.decorators.retry` decorator accepts and allow to validate them. It is also used for the `dynamic_params_callbacks` parameter of the same retry decorator to allow the callback to safely modify the decorator's parameters at runtime. """ tries: int delay: timedelta backoff_mode: str exceptions: Tuple[Type[Exception], ...] failure_message: str
[docs] def validate(self) -> None: """Validate the consistency of the current values of the instance properties. Raises: wmflib.exceptions.WmflibError: if any field has an invalid value. """ if self.backoff_mode not in ('constant', 'linear', 'power', 'exponential'): raise WmflibError(f'Invalid backoff_mode: {self.backoff_mode}') if self.backoff_mode == 'exponential' and self.delay.total_seconds() < 1: raise WmflibError(f'Delay must be greater than 1 if backoff_mode is exponential, got {self.delay}') if self.tries < 1: raise WmflibError(f'Tries must be a positive integer, got {self.tries}') if not self.failure_message: raise WmflibError('A failure_message must be set.')
def ensure_wrap(func: Callable) -> Callable: """Decorator to wrap other decorators to allow to call them both with and without arguments. Arguments: func: the decorated function, it must be a decorator. A decorator that accepts only one positional argument that is also a callable is not supported. """ @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Callable: """Decorator wrapper.""" if len(args) == 1 and not kwargs and callable(args[0]): # Called without arguments return func(args[0]) return lambda real_func: func(real_func, *args, **kwargs) # Called with arguments return wrapper
[docs] @ensure_wrap def retry( # pylint: disable=too-many-arguments,useless-suppression func: Callable, *, tries: int = 3, delay: timedelta = timedelta(seconds=3), backoff_mode: str = 'exponential', exceptions: Tuple[Type[Exception], ...] = (WmflibError,), failure_message: Optional[str] = None, dynamic_params_callbacks: Tuple[Callable[[RetryParams, Callable, Tuple, Dict[str, Any]], None], ...] = (), ) -> Callable: """Decorator to retry a function or method if it raises certain exceptions with customizable backoff. Note: The decorated function or method must be idempotent to avoid unwanted side effects. It can be called with or without arguments, in the latter case all the default values will be used. Examples: Define a function that polls for the existence of a file, retrying with the default parameters:: >>> from pathlib import Path >>> from wmflib.decorators import retry >>> from wmflib.exceptions import WmflibError >>> @retry ... def poll_file(path: Path): ... if not path.exists(): ... raise WmflibError(f'File {path} not found') ... >>> poll_file(Path('/tmp')) >>> poll_file(Path('/tmp/nonexistent')) [1/3, retrying in 3.00s] Attempt to run '__main__.poll_file' raised: File /tmp/nonexistent not found [2/3, retrying in 9.00s] Attempt to run '__main__.poll_file' raised: File /tmp/nonexistent not found Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python3/dist-packages/wmflib/decorators.py", line 160, in wrapper return func(*args, **kwargs) File "<stdin>", line 4, in poll_file wmflib.exceptions.WmflibError: File /tmp/nonexistent not found >>> Same example, but customizing the decorator parameters:: >>> from datetime import timedelta >>> from pathlib import Path >>> from wmflib.decorators import retry >>> @retry(tries=5, delay=timedelta(seconds=30), backoff_mode='constant', failure_message='File not found', ... exceptions=(RuntimeError,)) ... def poll_file(path: Path): ... if not path.exists(): ... raise RuntimeError(path) ... [1/5, retrying in 30.00s] File not found: /tmp/nonexistent [2/5, retrying in 30.00s] File not found: /tmp/nonexistent [3/5, retrying in 30.00s] File not found: /tmp/nonexistent [4/5, retrying in 30.00s] File not found: /tmp/nonexistent Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/lib/python3/dist-packages/wmflib/decorators.py", line 160, in wrapper return func(*args, **kwargs) File "<stdin>", line 5, in poll_file RuntimeError: /tmp/nonexistent Arguments: func (function, method): the decorated function. tries (int, optional): the number of times to try calling the decorated function or method before giving up. Must be a positive integer. delay (datetime.timedelta, optional): the initial delay in seconds for the first retry, used also as the base for the backoff algorithm. backoff_mode (str, optional): the backoff mode to use for the delay, available values are shown below. All the examples are made with ``tries=5`` and ``delay=3``: * **constant** .. code-block:: text Nth delay = delay Total delay = delay * tries Example: 3s, 3s, 3s, 3s, 3s => 15s max possible delay * **linear** .. code-block:: text Nth delay = (delay * N) with N in [1, tries] Total delay = 0.5 * tries * (delay + (delay * tries)) Example: 3s, 6s, 9s, 12s, 15s => 45s max possible delay * **power** .. code-block:: text Nth delay = (delay * 2^N) with N in [0, tries - 1] Total delay = delay * (2**tries - 1) Example: 3s, 6s, 12s, 24s, 48s => 93s max possible delay * **exponential** .. code-block:: text Nth delay = (delay^N) with N in [1, tries], delay must be > 1 Total delay = (delay * (delay**tries - 1)) / (delay - 1) Example: 3s, 9s, 27s, 81s, 243s => 363s max possible delay exceptions (type, tuple, optional): the decorated function call will be retried if it fails until it succeeds or `tries` attempts are reached. A retryable failure is defined as raising any of the exceptions listed. failure_message (str, optional): the message to log each time there's a retryable failure. Retry information and exception message are also included. Default: "Attempt to run '<fully qualified function>' raised" dynamic_params_callbacks (tuple): a tuple of callbacks that will be called at runtime to allow to modify the decorator's parameters. Each callable must adhere to the following interface:: def adjust_some_parameter(retry_params: RetryParams, func: Callable, args: Tuple, kwargs: Dict) -> None # Modify the retry_params parameter possibly using the decorated function object or its parameters # that are passed as tuple for the positional arguments and a dictionary for the keyword arguments This is a practical example that defines a callback that doubles the delay parameter of the ``@retry`` decorator if the decorated function/method has a 'slow' keyword argument that is to True:: def double_delay(retry_params, func, args, kwargs): if kwargs.get('slow', False): retry_params.delay = retry_params.delay * 2 @retry(delay=timedelta(seconds=10), dynamic_params_callbacks=(double_delay,)) def do_something(slow=False): # This method will be retried using 10 seconds as delay parameter in the @retry decorator, but # if the 'slow' parameter is set to True it will use a delay of 20 seconds instead. # Do something here. **While the callbacks will have access to the parameters passed to the decorated function, they should be treated as read-only variables.** Returns: function: the decorated function. """ if not failure_message: failure_message = f"Attempt to run '{func.__module__}.{func.__qualname__}' raised" static_params: Dict[str, Any] = { 'tries': tries, 'delay': delay, 'backoff_mode': backoff_mode, 'exceptions': exceptions, 'failure_message': failure_message, } @wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: """Decorated function.""" params = RetryParams(**static_params) for dynamic_params_callback in dynamic_params_callbacks: dynamic_params_callback(params, func, args, kwargs) params.validate() attempt = 0 while attempt < params.tries - 1: attempt += 1 try: # Call the decorated function or method return func(*args, **kwargs) except exceptions as e: sleep = get_backoff_sleep(params.backoff_mode, params.delay.total_seconds(), attempt) logger.warning("[%d/%d, retrying in %.2fs] %s: %s", attempt, params.tries, sleep, params.failure_message, _exception_message(e)) time.sleep(sleep) return func(*args, **kwargs) return wrapper
def _exception_message(exception: BaseException) -> str: """Joins the message of the given exception with those of any chained exceptions. Arguments: exception (BaseException): The most-recently raised exception. Returns: str: The joined message, formatted suitably for logging. """ message_parts = [str(exception)] while exception.__cause__ is not None or exception.__context__ is not None: # __cause__ and __context__ shouldn't both be set, but we use the same logic here as the built-in # exception handler, giving __cause__ priority, as described in PEP 3134. We list messages in # reverse order from the built-in handler (i.e. newest exception first) since we aren't following a # traceback. if exception.__cause__ is not None: message_parts.append(f'Caused by: {exception.__cause__}') exception = exception.__cause__ else: # e.__context__ is not None, due to the while condition. message_parts.append(f'Raised while handling: {exception.__context__}') exception = cast(BaseException, exception.__context__) # Casting away the Optional. return '\n'.join(message_parts) def get_backoff_sleep(backoff_mode: str, base: Union[int, float], index: int) -> Union[int, float]: """Calculate the amount of sleep for this attempt. Arguments: backoff_mode (str): the backoff mode to use for the delay, see the documentation for retry(). base (int, float): the base for the backoff algorithm. index (int): the index to calculate the Nth sleep time for the backoff. Return: int, float: the amount of sleep to perform for the backoff. """ if backoff_mode == 'constant': sleep = base elif backoff_mode == 'linear': sleep = base * index elif backoff_mode == 'power': sleep = base * 2 ** (index - 1) elif backoff_mode == 'exponential': sleep = base ** index else: raise ValueError(f'Invalid backoff_mode: {backoff_mode}') return sleep