Source code for scripts.watchlist

#!/usr/bin/env python3
"""Allows access to the bot account's watchlist.

The watchlist can be updated manually by running this script.

Syntax:

    python pwb.py watchlist [-all | -count | -count:all | -new]

Command line options:

-all        Reloads watchlists for all wikis where a watchlist is
            already present.

-count      Count only the total number of pages on the watchlist of the
            account the bot has access to.

-count:all  Count only the total number of pages on all wikis watchlists
            that the bot is connected to.

-new        Load watchlists for all wikis where accounts is set in user
            config file

.. versionchanged:: 7.7
   watchlist is retrieved in parallel tasks.
"""
#
# (C) Pywikibot team, 2005-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations

import datetime
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

import pywikibot
from pywikibot import config
from pywikibot.data.api import CachedRequest
from pywikibot.exceptions import InvalidTitleError


try:
    from scripts.maintenance.cache import CacheEntry
except ModuleNotFoundError:
    from pywikibot_scripts.maintenance.cache import CacheEntry


[docs] def get(site=None) -> list[str]: """Load the watchlist, fetching it if necessary.""" if site is None: site = pywikibot.Site() return [p.title() for p in site.watched_pages()]
[docs] def count_watchlist(site=None) -> None: """Count only the total number of page(s) in watchlist for this wiki.""" if site is None: site = pywikibot.Site() watchlist_count = len(refresh(site)) pywikibot.info(f'There are {watchlist_count} page(s) in the watchlist.')
[docs] def count_watchlist_all(quiet=False) -> None: """Count only the total number of page(s) in watchlist for all wikis.""" if not quiet: pywikibot.info('Counting pages in watchlists of all wikis...') with ThreadPoolExecutor() as executor: futures = {executor.submit(refresh, pywikibot.Site(lang, family)) for family in config.usernames for lang in config.usernames[family]} wl_count_all = sum(len(future.result()) for future in as_completed(futures)) if not quiet: pywikibot.info(f'There are a total of {wl_count_all} page(s) in the' ' watchlists for all wikis.')
[docs] def isWatched(pageName, site=None): # noqa: N802, N803 """Check whether a page is being watched.""" watchlist = get(site) return pageName in watchlist
[docs] def refresh(site): """Fetch the watchlist.""" pywikibot.info(f'Retrieving watchlist for {site}.') return list(site.watched_pages(force=True))
[docs] def refresh_all() -> None: """Reload watchlists for all wikis where a watchlist is already present.""" cache_path = CachedRequest._get_cache_dir() files = os.scandir(cache_path) seen = set() with ThreadPoolExecutor() as executor: for filename in files: entry = CacheEntry(cache_path, filename) entry._load_cache() entry.parse_key() entry._rebuild() if entry.site in seen: continue # for generator API usage we have to check the modules modules = entry._params.get('modules', []) modules_found = any(module.endswith('watchlistraw') for module in modules) # for list API usage 'watchlistraw' is directly found if modules_found or 'watchlistraw' in entry._data: executor.submit(refresh, entry.site) seen.add(entry.site)
[docs] def refresh_new() -> None: """Load watchlists of all wikis for accounts set in user config.""" pywikibot.info(f'Downloading all watchlists for your accounts in ' f'{config.user_config_file}') count_watchlist_all(quiet=True)
[docs] def main(*args: str) -> None: """Process command line arguments and invoke bot. If args is an empty list, sys.argv is used. :param args: command line arguments """ opt_all = False opt_new = False opt_count = False opt_count_all = False for arg in pywikibot.handle_args(args): if arg in ('-all', '-update'): opt_all = True elif arg == '-new': opt_new = True elif arg == '-count': opt_count = True elif arg == '-count:all': opt_count_all = True if opt_all: refresh_all() elif opt_new: refresh_new() elif opt_count: count_watchlist() elif opt_count_all: count_watchlist_all() else: site = pywikibot.Site() count_watchlist(site) watchlist = list(site.watched_pages(force=True)) for page in watchlist: try: pywikibot.stdout(page.title()) except InvalidTitleError as e: pywikibot.error(e)
if __name__ == '__main__': start = datetime.datetime.now() main() pywikibot.info('\nExecution time: ' f'{(datetime.datetime.now() - start).seconds} seconds')