Source code for wmflib.fileio

"""File I/O module."""

import fcntl
import logging
from contextlib import contextmanager
from datetime import timedelta
from os import PathLike
from typing import IO, Generator

from wmflib.decorators import retry
from wmflib.exceptions import WmflibError

logger = logging.getLogger(__name__)


[docs] class FileIOError(WmflibError): """Custom exception class for errors of the BlockInFile class."""
[docs] class LockError(FileIOError): """Custom exception class raised when unable to exclusively lock a file."""
[docs] @contextmanager def locked_open(file_path: PathLike, file_mode: str = "r", *, timeout: int = 10) -> Generator[IO, None, None]: """Context manager to open a file with an exclusive lock on it and a retry logic. Examples: :: from wmflib.fileio import locked_open with locked_open("existing.file") as f: text = f.read() with locked_open("new.out", "w") as f: f.write("Some text") Arguments: file_path (os.PathLike): the file path to open. file_mode (str, optional): the mode in which the file is opened, see :py:func:`open` for details. timeout (int, optional): the total timeout in seconds to wait to acquire the exclusive lock before giving up. Ten tries will be attempted to acquire the lock within the timeout. Raises: wmflib.fileio.LockError: on failure to acquire the exclusive lock on the file. Yields: file object: the open file with an exclusive lock on it. """ tries = 10 with open(file_path, file_mode, encoding="utf-8") as fd: try: # Decorate the call to the locking function to retry acquiring the lock # with: decorator(decorator_args)(function)(function_args) # no-value-for-parameter is needed because pylint is confused by @ensure_wraps retry( # pylint: disable=no-value-for-parameter tries=tries, delay=timedelta(seconds=timeout / tries), backoff_mode="constant", exceptions=(OSError, BlockingIOError), )(fcntl.flock)(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) logger.debug("Acquired exclusive lock on %s", file_path) except OSError as e: raise LockError(f"Unable to acquire exclusive lock on {file_path}") from e try: yield fd finally: fcntl.flock(fd, fcntl.LOCK_UN) logger.debug("Released exclusive lock on %s", file_path)