"""File I/O module."""
import fcntl
import logging
from contextlib import contextmanager
from datetime import timedelta
from os import PathLike
from typing import Generator, IO
from wmflib.decorators import retry
from wmflib.exceptions import WmflibError
logger = logging.getLogger(__name__)
[docs]
class FileIOError(WmflibError):
"""Custom exception class for errors of the BlockInFile class."""
[docs]
class LockError(FileIOError):
"""Custom exception class raised when unable to exclusively lock a file."""
[docs]
@contextmanager
def locked_open(file_path: PathLike, file_mode: str = 'r', *, timeout: int = 10) -> Generator[IO, None, None]:
"""Context manager to open a file with an exclusive lock on it and a retry logic.
Examples:
::
from wmflib.fileio import locked_open
with locked_open('existing.file') as f:
text = f.read()
with locked_open('new.out', 'w') as f:
f.write('Some text')
Arguments:
file_path (os.PathLike): the file path to open.
file_mode (str, optional): the mode in which the file is opened, see :py:func:`open` for details.
timeout (int, optional): the total timeout in seconds to wait to acquire the exclusive lock before giving up.
Ten tries will be attempted to acquire the lock within the timeout.
Raises:
wmflib.fileio.LockError: on failure to acquire the exclusive lock on the file.
Yields:
file object: the open file with an exclusive lock on it.
"""
tries = 10
with open(file_path, file_mode, encoding='utf-8') as fd:
try:
# Decorate the call to the locking function to retry acquiring the lock:
# decorator(decorator_args)(function)(function_args)
# no-value-for-parameter is needed because pylint is confused by @ensure_wraps
retry( # pylint: disable=no-value-for-parameter
tries=tries,
delay=timedelta(seconds=timeout / tries),
backoff_mode='constant',
exceptions=(OSError, BlockingIOError)
)(fcntl.flock)(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.debug('Acquired exclusive lock on %s', file_path)
except OSError as e:
raise LockError(f'Unable to acquire exclusive lock on {file_path}') from e
try:
yield fd
finally:
fcntl.flock(fd, fcntl.LOCK_UN)
logger.debug('Released exclusive lock on %s', file_path)