Source code for pywikibot.data.api._requests

"""Objects representing API requests."""
#
# (C) Pywikibot team, 2007-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations

import datetime
import hashlib
import inspect
import os
import pickle
import pprint
import re
import sys
import traceback
from collections.abc import MutableMapping
from contextlib import suppress
from email.mime.nonmultipart import MIMENonMultipart
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlencode, urlparse
from warnings import warn

import pywikibot
from pywikibot import config
from pywikibot.backports import Callable, Match, removeprefix
from pywikibot.comms import http
from pywikibot.data import WaitingMixin
from pywikibot.exceptions import (
    Client414Error,
    Error,
    FatalServerError,
    MaxlagTimeoutError,
    NoUsernameError,
    Server504Error,
    SiteDefinitionError,
)
from pywikibot.login import LoginStatus
from pywikibot.textlib import removeDisabledParts, removeHTMLParts
from pywikibot.tools import deprecated


__all__ = ('CachedRequest', 'Request', 'encode_url')

# Actions that imply database updates on the server, used for various
# things like throttling or skipping actions when we're in simulation
# mode
WRITE_ACTIONS = {
    # main actions, see https://www.mediawiki.org/wiki/API:Main_page
    'block', 'clearhasmsg', 'createaccount', 'createlocalaccount', 'delete',
    'deleteglobalaccount', 'edit', 'editmassmessagelist', 'emailuser',
    'filerevert', 'flowthank', 'globalblock', 'globalpreferenceoverrides',
    'globalpreferences', 'globaluserrights', 'imagerotate', 'import',
    'linkaccount', 'managetags', 'massmessage', 'mergehistory', 'move',
    'newslettersubscribe', 'options', 'patrol', 'protect', 'purge',
    'removeauthenticationdata', 'resetpassword', 'revisiondelete', 'rollback',
    'setglobalaccountstatus', 'setnotificationtimestamp', 'setpagelanguage',
    'strikevote', 'tag', 'thank', 'threadaction', 'transcodereset',
    'translationreview', 'unblock', 'undelete', 'unlinkaccount', 'upload',
    'userrights', 'watch', 'wikilove',
    # wikibase actions, see https://www.mediawiki.org/wiki/Wikibase/API
    'wbcreateclaim', 'wbcreateredirect', 'wbeditentity', 'wblinktitles',
    'wbmergeitems', 'wbremoveclaims', 'wbremovequalifiers',
    'wbremovereferences', 'wbsetaliases', 'wbsetclaim', 'wbsetclaimvalue',
    'wbsetdescription', 'wbsetlabel', 'wbsetqualifier', 'wbsetreference',
    'wbsetsitelink',
    # lexeme (internal) actions
    'wbladdform', 'wbladdsense', 'wbleditformelements', 'wbleditsenseelements',
    'wblmergelexemes', 'wblremoveform', 'wblremovesense',
}

lagpattern = re.compile(
    r'Waiting for [\w.: ]+: (?P<lag>\d+(?:\.\d+)?) seconds? lagged')


[docs] class Request(MutableMapping, WaitingMixin): """A request to a Site's api.php interface. Attributes of this object (except for the special parameters listed below) get passed as commands to api.php, and can be get or set using the dict interface. All attributes must be strings. Use an empty string for parameters that don't require a value. For example, ``Request(action="query", titles="Foo bar", prop="info", redirects="")`` corresponds to the API request ``api.php?action=query&titles=Foo%20bar&prop=info&redirects`` This is the lowest-level interface to the API, and can be used for any request that a particular site's API supports. See the API documentation (https://www.mediawiki.org/wiki/API) and site-specific settings for details on what parameters are accepted for each request type. Uploading files is a special case: to upload, the parameter `mime` must contain a dict, and the parameter `file` must be set equal to a valid filename on the local computer, *not* to the content of the file. Returns a dict containing the JSON data returned by the wiki. Normally, one of the dict keys will be equal to the value of the 'action' parameter. Errors are caught and raise an APIError exception. Example: >>> r = Request(parameters={'action': 'query', 'meta': 'userinfo'}) >>> # This is equivalent to >>> # https://{path}/api.php?action=query&meta=userinfo&format=json >>> # change a parameter >>> r['meta'] = "userinfo|siteinfo" >>> # add a new parameter >>> r['siprop'] = "namespaces" >>> # note that "uiprop" param gets added automatically >>> r.action 'query' >>> sorted(r._params) ['action', 'meta', 'siprop'] >>> r._params['action'] ['query'] >>> r._params['meta'] ['userinfo', 'siteinfo'] >>> r._params['siprop'] ['namespaces'] >>> data = r.submit() >>> isinstance(data, dict) True >>> set(['query', 'batchcomplete', 'warnings']).issuperset(data.keys()) True >>> 'query' in data True >>> sorted(data['query']) ['namespaces', 'userinfo'] .. versionchanged:: 8.4 inherited from :class:`WaitingMixin`. .. versionchanged:: 9.0 *keys* and *items* methods return a view object instead a list """ # To make sure the default value of 'parameters' can be identified. _PARAM_DEFAULT = object() def __init__(self, site=None, mime: dict | None = None, throttle: bool = True, max_retries: int | None = None, retry_wait: int | None = None, use_get: bool | None = None, parameters=_PARAM_DEFAULT, **kwargs) -> None: """Create a new Request instance with the given parameters. The parameters for the request can be defined via either the 'parameters' parameter or the keyword arguments. The keyword arguments were the previous implementation but could cause problems when there are arguments to the API named the same as normal arguments to this class. So the second parameter 'parameters' was added which just contains all parameters. When a Request instance is created it must use either one of them and not both at the same time. To have backwards compatibility it adds a parameter named 'parameters' to kwargs when both parameters are set as that indicates an old call and 'parameters' was originally supplied as a keyword parameter. If undefined keyword arguments were given AND the 'parameters' parameter was supplied as a positional parameter it still assumes 'parameters' were part of the keyword arguments. If a class is using Request and is directly forwarding the parameters, :py:obj:`Request.clean_kwargs` can be used to automatically convert the old kwargs mode into the new parameter mode. This normalizes the arguments so that when the API parameters are modified the changes can always be applied to the 'parameters' parameter. :param site: The Site to which the request will be submitted. If not supplied, uses the user's configured default Site. :param mime: If not None, send in "multipart/form-data" format (default None). Parameters which should only be transferred via mime mode are defined via this parameter (even an empty dict means mime shall be used). :param max_retries: Maximum number of times to retry after errors, defaults to config.max_retries. :param retry_wait: Minimum time in seconds to wait after an error, defaults to config.retry_wait seconds (doubles each retry until config.retry_max seconds is reached). :param use_get: Use HTTP GET request if possible. If False it uses a POST request. If None, it'll try to determine via action=paraminfo if the action requires a POST. :param parameters: The parameters used for the request to the API. :type parameters: dict :param kwargs: The parameters used for the request to the API. """ if site is None: self.site = pywikibot.Site() warn(f'Request() invoked without a site; setting to {self.site}', RuntimeWarning, 2) else: self.site = site self.mime = mime if isinstance(mime, bool): raise TypeError('mime param in api.Request() must not be boolean') self.throttle = throttle self.use_get = use_get if max_retries is not None: self.max_retries = max_retries if retry_wait is not None: self.retry_wait = retry_wait self.json_warning = False # The only problem with that system is that it won't detect when # 'parameters' is actually the only parameter for the request as it # then assumes it's using the new mode (and the parameters are actually # in the parameter 'parameters' not that the parameter 'parameters' is # actually a parameter for the request). But that is invalid anyway as # it MUST have at least an action parameter for the request which would # be in kwargs if it's using the old mode. if kwargs: if parameters is not self._PARAM_DEFAULT: # 'parameters' AND kwargs is set. In that case think of # 'parameters' being an old kwarg which is now filled in an # actual parameter self._warn_both() kwargs['parameters'] = parameters # When parameters wasn't set it's likely that kwargs-mode was used self._warn_kwargs() parameters = kwargs elif parameters is self._PARAM_DEFAULT: parameters = {} self._params: dict[str, Any] = {} if 'action' not in parameters: raise ValueError("'action' specification missing from Request.") self.action = parameters['action'] self.update(parameters) # also convert all parameter values to lists self._warning_handler: Callable[[str, str], Match[str] | bool | None] | None = None # noqa: E501 self.write = self.action in WRITE_ACTIONS # Client side verification that the request is being performed # by a logged in user, and warn if it isn't a config username. if self.write: try: username = self.site.userinfo['name'] except KeyError: raise Error('API write action attempted without user name') if 'anon' in self.site.userinfo: raise Error(f'API write action attempted as IP {username!r}') if not self.site.user() or self.site.username() != username: pywikibot.warning( f'API write action by unexpected username {username} ' f'commenced.\nuserinfo: {self.site.userinfo!r}') # Make sure user is logged in if self.write: pywikibot.debug('Adding user assertion') self['assert'] = 'user'
[docs] @classmethod def create_simple(cls, req_site, **kwargs): """Create a new instance using all args except site for the API.""" # This ONLY support site so that any caller can be sure there will be # no conflict with PWB parameters # req_site is needed to avoid conflicts with possible site keyword in # kwarg until positional-only parameters are supported, see T262926 # TODO: Use ParamInfo request to determine valid parameters if isinstance(kwargs.get('parameters'), dict): warn('The request contains already a "parameters" entry which is ' 'a dict.') return cls(site=req_site, parameters=kwargs)
@classmethod def _warn_both(cls) -> None: """Warn that kwargs mode was used but parameters was set too.""" warn('Both kwargs and parameters are set in Request.__init__. It ' 'assumes that "parameters" is actually a parameter of the ' 'Request and is added to kwargs.', DeprecationWarning, 3) @classmethod def _warn_kwargs(cls) -> None: """Warn that kwargs was used instead of parameters.""" warn('Instead of using kwargs from Request.__init__, parameters ' 'for the request to the API should be added via the ' '"parameters" parameter.', DeprecationWarning, 3)
[docs] @classmethod def clean_kwargs(cls, kwargs: dict) -> dict: """Convert keyword arguments into new parameters mode. If there are no other arguments in kwargs apart from the used arguments by the class' initializer it'll just return kwargs and otherwise remove those which aren't in the initializer and put them in a dict which is added as a 'parameters' keyword. It will always create a shallow copy. :param kwargs: The original keyword arguments which is not modified. :return: The normalized keyword arguments. """ if 'expiry' in kwargs and kwargs['expiry'] is None: del kwargs['expiry'] args = set() for super_cls in inspect.getmro(cls): if not super_cls.__name__.endswith('Request'): break args |= set(inspect.getfullargspec(super_cls.__init__).args) else: raise ValueError(f'Request was not a super class of {cls!r}') args -= {'self'} old_kwargs = set(kwargs) # all kwargs defined above but not in args indicate 'kwargs' mode if old_kwargs - args: # Move all kwargs into parameters parameters = {name: value for name, value in kwargs.items() if name not in args or name == 'parameters'} if 'parameters' in parameters: cls._warn_both() # Copy only arguments and not the parameters kwargs = {name: value for name, value in kwargs.items() if name in args or name == 'self'} kwargs['parameters'] = parameters # Make sure that all arguments have remained assert (old_kwargs | {'parameters'} == set(kwargs) | set(kwargs['parameters'])) assert (('parameters' in old_kwargs) is ('parameters' in kwargs['parameters'])) cls._warn_kwargs() else: kwargs = dict(kwargs) kwargs.setdefault('parameters', {}) return kwargs
def _format_value(self, value): """Format the MediaWiki API request parameter. Converts from Python datatypes to MediaWiki API parameter values. Supports: * datetime.datetime (using strftime and ISO8601 format) * pywikibot.page.BasePage (using title (+namespace; -section)) All other datatypes are converted to string. """ if isinstance(value, datetime.datetime): return value.strftime(pywikibot.Timestamp.ISO8601Format) if isinstance(value, pywikibot.page.BasePage): if value.site != self.site: raise RuntimeError(f'value.site {value.site!r} is different ' f'from Request.site {self.site!r}') return value.title(with_section=False) return str(value) def __getitem__(self, key): """Implement dict interface.""" return self._params[key] def __setitem__(self, key: str, value) -> None: """Set MediaWiki API request parameter. :param value: param value(s) :type value: str in site encoding (string types may be a `|`-separated list) iterable, where items are converted to string with special handling for datetime.datetime to convert it to a string using the ISO 8601 format accepted by the MediaWiki API. """ if isinstance(value, bytes): value = value.decode(self.site.encoding()) if isinstance(value, str): value = value.split('|') if hasattr(value, 'api_iter'): self._params[key] = value else: try: iter(value) except TypeError: # convert any non-iterable value into a single-element list self._params[key] = [value] else: self._params[key] = list(value) def __delitem__(self, key) -> None: """Implement dict interface.""" del self._params[key] def __iter__(self): """Implement dict interface.""" return iter(self._params) def __len__(self) -> int: """Implement dict interface.""" return len(self._params)
[docs] @deprecated('items()', since='9.0.0') def iteritems(self): """Implement dict interface. .. deprecated:: 9.0 Use ``items()`` instead. """ return iter(self.items())
def _add_defaults(self): """Add default parameters to the API request. This method will only add them once. """ if hasattr(self, '__defaulted'): return if self.mime is not None and set(self._params) & set(self.mime): raise ValueError('The mime and params shall not share the ' 'same keys.') if self.action == 'query': meta = self._params.get('meta', []) # Special logic for private wikis (T153903). # If the wiki requires login privileges to read articles, pywikibot # will be blocked from accessing the userinfo. # Work around this by requiring userinfo only if 'tokens' and # 'login' are not both set. typep = self._params.get('type', []) if not ('tokens' in meta and 'login' in typep): if 'userinfo' not in meta: meta = {*meta, 'userinfo'} self['meta'] = sorted(meta) uiprop = self._params.get('uiprop', []) uiprop = {*uiprop, 'blockinfo', 'hasmsg'} self['uiprop'] = sorted(uiprop) if 'prop' in self._params \ and self.site.has_extension('ProofreadPage'): prop = set(self['prop'] + ['proofread']) self['prop'] = sorted(prop) elif self.action == 'help': self['wrap'] = '' if config.maxlag: self._params.setdefault('maxlag', [str(config.maxlag)]) self._params.setdefault('format', ['json']) if self['format'] != ['json']: raise TypeError( f'Query format {self["format"]!r} cannot be parsed.') self.__defaulted = True # skipcq: PTC-W0037 def _encoded_items(self) -> dict[str, str | bytes]: """Build a dict of params with minimal encoding needed for the site. This helper method only prepares params for serialisation or transmission, so it only encodes values which are not ASCII, requiring callers to consider how to handle ASCII vs other values, however the output is designed to enable __str__ and __repr__ to do the right thing in most circumstances. Servers which use an encoding that is not a superset of ASCII are not supported. :return: Parameters either in the site encoding, or ASCII strings """ params = {} for key, values in self._params.items(): try: iterator = values.api_iter() except AttributeError: if len(values) == 1: value = values[0] if value is True: values = [''] elif value is False or value is None: # False and None are not included in the http URI continue iterator = iter(values) value = '|'.join(self._format_value(value) for value in iterator) # If the value is encodable as ascii, do not encode it. # This means that any value which can be encoded as ascii # is presumed to be ascii, and servers using a site encoding # which is not a superset of ascii may be problematic. try: value.encode('ascii') except UnicodeError: try: value = value.encode(self.site.encoding()) except Exception: pywikibot.error( f'_encoded_items: {key!r} could not be encoded as ' f'{self.site.encoding()!r}: {value!r}') assert key.encode('ascii') assert isinstance(key, str) params[key] = value return params def _http_param_string(self): """Return the parameters as a HTTP URL query fragment. URL encodes the parameters provided by _encoded_items() .. note:: Not all parameters are sorted, therefore for two given CachedRequest objects with equal _params, the result of _http_param_string() is not necessarily equal. """ return encode_url(self._encoded_items()) def __str__(self) -> str: """Return a string representation.""" return unquote(self.site.scriptpath() + '/api.php?' + self._http_param_string()) def __repr__(self) -> str: """Return internal representation.""" cls = type(self) return f"{cls.__module__}.{cls.__name__}<{self.site}->'{self}'>" def _simulate(self, action): """Simulate action.""" if action and config.simulate and ( self.write or action in config.actions_to_block): pywikibot.info( f'<<black;yellow>>SIMULATION: {action} action blocked.') # for more realistic simulation if config.simulate is not True: pywikibot.sleep(float(config.simulate)) return { action: {'result': 'Success', 'nochange': ''}, # wikibase results 'entity': {'lastrevid': -1, 'id': '-1'}, 'pageinfo': {'lastrevid': -1}, 'reference': {'hash': -1}, } return None def _is_wikibase_error_retryable(self, error): # dict of error message and current action. # Value is True if action type is to be ignored err_msg = { 'edit-already-exists': 'wbeditentity', 'actionthrottledtext': True, # T192912, T268645 } messages = error.get('messages') message = None # bug T68619; after Wikibase breaking change 1ca9cee we have a # list of messages if isinstance(messages, list): for item in messages: message = item['name'] action = err_msg.get(message) if action is True or action == self.action: return True return False if isinstance(messages, dict): try: # behaviour before gerrit 124323 breaking change message = messages['0']['name'] except KeyError: # unsure the new output is always a list message = messages['name'] action = err_msg.get(message) return action is True or action == self.action @staticmethod def _generate_mime_part(key, content, keytype=None, headers=None): if not keytype: try: content.encode('ascii') keytype = ('text', 'plain') except (UnicodeError, AttributeError): keytype = ('application', 'octet-stream') submsg = MIMENonMultipart(*keytype) content_headers = {'name': key} if headers: content_headers.update(headers) submsg.add_header('Content-disposition', 'form-data', **content_headers) if keytype != ('text', 'plain'): submsg['Content-Transfer-Encoding'] = 'binary' submsg.set_payload(content) return submsg def _use_get(self): """Verify whether 'get' is to be used.""" if (not config.enable_GET_without_SSL and self.site.protocol() != 'https' or self.site.is_oauth_token_available()): # T108182 workaround use_get = False elif self.use_get is None: if self.action == 'query': # for queries check the query module modules = set() for mod_type_name in ('list', 'prop', 'generator'): modules.update(self._params.get(mod_type_name, [])) else: modules = {self.action} if modules: self.site._paraminfo.fetch(modules) use_get = all('mustbeposted' not in self.site._paraminfo[mod] for mod in modules) else: # If modules is empty, just 'meta' was given, which doesn't # require POSTs, and is required for ParamInfo use_get = True else: use_get = self.use_get return use_get @classmethod def _build_mime_request(cls, params: dict, mime_params: dict) -> tuple[dict, bytes]: """Construct a MIME multipart form post. :param params: HTTP request params :param mime_params: HTTP request parts which must be sent in the body :type mime_params: dict of (content, keytype, headers) :return: HTTP request headers and body """ # construct a MIME message containing all API key/values container = pywikibot.data.api.MIMEMultipart(_subtype='form-data') for key, value in params.items(): submsg = cls._generate_mime_part(key, value) container.attach(submsg) for key, value in mime_params.items(): submsg = cls._generate_mime_part(key, *value) container.attach(submsg) # strip the headers to get the HTTP message body body = container.as_bytes() marker = b'\n\n' # separates headers from body eoh = body.find(marker) body = body[eoh + len(marker):] # retrieve the headers from the MIME object headers = dict(container.items()) return headers, body def _get_request_params(self, use_get, paramstring): """Get request parameters.""" uri = self.site.apipath() if self.mime is not None: (headers, body) = Request._build_mime_request( self._encoded_items(), self.mime) use_get = False # MIME requests require HTTP POST else: headers = {'Content-Type': 'application/x-www-form-urlencoded'} if (not config.maximum_GET_length or config.maximum_GET_length < len(paramstring)): use_get = False if use_get: uri = f'{uri}?{paramstring}' body = None else: body = paramstring pywikibot.debug(f'API request to {self.site} (uses get: {use_get}):\n' f'Headers: {headers!r}\nURI: {uri!r}\nBody: {body!r}') return use_get, uri, body, headers
[docs] def _http_request(self, use_get: bool, uri: str, data, headers, paramstring) -> tuple: """Get or post a http request with exception handling. .. versionchanged:: 8.2 change the scheme if the previous request didn't have json content. .. versionchanged:: 9.2 no wait cycles for :exc:`ImportError` and :exc:`NameError`. :return: a tuple containing requests.Response object from :func:`comms.http.request` and *use_get* value :meta public: """ kwargs = {} schemes = ('http', 'https') if self.json_warning and self.site.protocol() in schemes: # retry with other scheme kwargs['protocol'] = schemes[self.site.protocol() == 'http'] try: response = http.request(self.site, uri=uri, method='GET' if use_get else 'POST', data=data, headers=headers, **kwargs) except Server504Error: pywikibot.log('Caught HTTP 504 error; retrying') except Client414Error: if use_get: pywikibot.log('Caught HTTP 414 error; retrying') use_get = False else: pywikibot.warning( 'Caught HTTP 414 error, although not using GET.') raise except (ConnectionError, FatalServerError, NameError): # These errors are not going to be fixed by just waiting raise except ImportError as e: # Leave the script gracefully pywikibot.error(e) sys.exit(1) # TODO: what other exceptions can occur here? except Exception: # for any other error on the http request, wait and retry pywikibot.error(traceback.format_exc()) pywikibot.log(f'{uri}, {paramstring}') else: return response, use_get self.wait() return None, use_get
[docs] def _json_loads(self, response) -> dict | None: """Return a dict from requests.Response. .. versionchanged:: 8.2 show a warning to add a ``protocol()`` method to the family file if suitable. :param response: a requests.Response object :type response: requests.Response :return: a data dict :raises pywikibot.exceptions.APIError: unknown action found :raises pywikibot.exceptions.APIError: unknown query result type :meta public: """ try: result = response.json() except ValueError: # if the result isn't valid JSON, there may be a server problem. # Wait a few seconds and try again. # Show 20 lines of bare text without script parts text = removeDisabledParts(response.text, ['script']) text = re.sub('\n{2,}', '\n', '\n'.join(removeHTMLParts(text).splitlines()[:20])) msg = f"""\ Non-JSON response received from server {self.site} for url {response.url} The server may be down. Status code: {response.status_code} The text message is: {text} """ # Do not retry for AutoFamily but raise a SiteDefinitionError # Note: family.AutoFamily is a function to create that class if self.site.family.__class__.__name__ == 'AutoFamily': pywikibot.debug(msg) raise SiteDefinitionError( f'Invalid AutoFamily({self.site.family.domain!r})') if not self.json_warning: # warn only once pywikibot.warning(msg) self.json_warning = True # there might also be an overflow, so try a smaller limit for param in self._params: if param.endswith('limit'): # param values are stored a list of str value = self[param][0] if value.isdigit(): self[param] = [str(int(value) // 2)] pywikibot.info(f'Set {param} = {self[param]}') else: scheme = urlparse(response.url).scheme if self.json_warning and scheme != self.site.protocol(): warn(f""" Your {self.site.family} family uses a wrong scheme {self.site.protocol()!r} but {scheme!r} is required. Please add the following code to your family file: def protocol(self, code: str) -> str: return '{scheme}' """, stacklevel=2) return result or {} self.wait() return None
def _relogin(self, message: str = '') -> None: """Force re-login and inform user.""" message += ' Forcing re-login.' pywikibot.error(f'{message.strip()}') self.site._relogin() def _userinfo_query(self, result) -> bool: """Handle userinfo query.""" if self.action == 'query' and 'userinfo' in result.get('query', ()): # if we get passed userinfo in the query result, we can confirm # that we are logged in as the correct user. If this is not the # case, force a re-login. username = result['query']['userinfo']['name'] if (self.site.user() is not None and self.site.user() != username and self.site._loginstatus != LoginStatus.IN_PROGRESS): self._relogin(f'Logged in as {username!r} instead of ' f'{self.site.user()!r}.') return True return False
[docs] def _handle_warnings(self, result: dict[str, Any]) -> bool: """Handle warnings; return True to retry request, False to resume. .. versionchanged:: 7.2 Return True to retry the current request and False to resume. :meta public: """ retry = False if 'warnings' not in result: return retry for mod, warning in result['warnings'].items(): if mod == 'info': continue if '*' in warning: text = warning['*'] elif 'html' in warning: # bug T51978 text = warning['html']['*'] else: pywikibot.warning( f'API warning ({mod}) of unknown format: {warning}') continue # multiple warnings are in text separated by a newline for single_warning in text.splitlines(): if (not callable(self._warning_handler) or not self._warning_handler(mod, single_warning)): handled = self._default_warning_handler(mod, single_warning) if handled is None: pywikibot.warning( f'API warning ({mod}): {single_warning}') else: retry = retry or handled return retry
[docs] def _default_warning_handler(self, mode: str, msg: str) -> bool | None: """A default warning handler to handle specific warnings. Return True to retry the request, False to resume and None if the warning is not handled. .. versionadded:: 7.2 :meta public: """ warnings = { 'purge': ("You've exceeded your rate limit. " 'Please wait some time and try again.', '_ratelimited', True), } warning, handler, retry = warnings.get(mode, (None, None, None)) if handler and msg == warning: # Only show the first warning part pywikibot.warning(msg.split('.')[0] + '.') # call the handler getattr(self, handler)() return retry return None
def _logged_in(self, code) -> bool: """Check whether user is logged in. Older wikis returned an error instead of a warning when the request asked for too many values. If we get this error, assume we are not logged in (we can't check this because the userinfo data is not present) and force a re-login """ if code.endswith('limit'): message = 'Received API limit error.' # If the user assertion failed, we're probably logged out as well. elif code == 'assertuserfailed': message = 'User assertion failed.' # Lastly, the purge module requires a POST if used as anonymous user, # but we normally send a GET request. If the API tells us the request # has to be POSTed, we're probably logged out. elif code == 'mustbeposted' and self.action == 'purge': message = "Received unexpected 'mustbeposted' error." else: return True self._relogin(message) return False def _internal_api_error(self, code, error, result) -> bool: """Check for ``internal_api_error_`` or readonly and retry. :raises pywikibot.exceptions.APIMWError: internal_api_error or readonly :meta public: """ iae = 'internal_api_error_' if not (code.startswith(iae) or code == 'readonly'): return False # T154011 class_name = code if code == 'readonly' else removeprefix(code, iae) del error['code'] # is added via class_name e = pywikibot.exceptions.APIMWError(class_name, **error) # If the error key is in this table, it is probably a temporary # problem, so we will retry the edit. # TODO: T154011: 'ReadOnlyError' seems replaced by 'readonly' retry = class_name in [ 'DBConnectionError', # T64974 'DBQueryError', # T60158 'DBQueryTimeoutError', # T297708 'DBUnexpectedError', # T360930 'ReadOnlyError', # T61227 'readonly', # T154011 ] pywikibot.error('Detected MediaWiki API exception {}{}' .format(e, '; retrying' if retry else '; raising')) param_repr = str(self._params) pywikibot.log(f'MediaWiki exception {class_name} details:\n' f' query=\n{pprint.pformat(param_repr)}\n' f' response=\n{result}') if not retry: raise e self.wait() return True def _ratelimited(self) -> None: """Handle ratelimited warning. This is also called from :meth:`_default_warning_handler`. """ self.wait(self.site.ratelimit(self.action).delay) def _bad_token(self, code) -> bool: """Check for bad token. Check for bad tokens, call :meth:`TokenWallet.update_tokens() <pywikibot.site._tokenwallet.TokenWallet.update_tokens>` method to update the bunch of tokens and continue loop in :meth:`submit`. """ if code != 'badtoken': # Other code not handled here return False if self.site._loginstatus == LoginStatus.IN_PROGRESS: pywikibot.log(f'Login status: {self.site._loginstatus.name}') return False # invalidate superior wiki cookies (T224712) pywikibot.data.api._invalidate_superior_cookies(self.site.family) # update tokens tokens = self.site.tokens.update_tokens(self._params['token']) self._params['token'] = tokens return True
[docs] def wait(self, delay: int | None = None) -> None: """Determine how long to wait after a failed request. Also reset last API error with wait cycles. .. versionadded:: 9.0 :param delay: Minimum time in seconds to wait. Overwrites ``retry_wait`` variable if given. The delay doubles each retry until ``retry_max`` seconds is reached. """ self.last_error = dict.fromkeys(['code', 'info']) super().wait(delay)
[docs] def submit(self) -> dict: """Submit a query and parse the response. .. versionchanged:: 8.0.4 in addition to *readapidenied* also try to login when API response is *notloggedin*. .. versionchanged:: 9.0 Raise :exc:`exceptions.APIError` if the same error comes twice in a row within the loop. :return: a dict containing data retrieved from api.php """ test_running = os.environ.get('PYWIKIBOT_TEST_RUNNING', '0') == '1' self._add_defaults() use_get = self._use_get() retries = 0 self.last_error = dict.fromkeys(['code', 'info']) while True: paramstring = self._http_param_string() simulate = self._simulate(self.action) if simulate: return simulate if self.throttle: self.site.throttle(write=self.write) else: pywikibot.log( f"Submitting unthrottled action '{self.action}'.") use_get, uri, body, headers = self._get_request_params(use_get, paramstring) response, use_get = self._http_request(use_get, uri, body, headers, paramstring) if response is None: continue result = self._json_loads(response) if result is None: continue if self._userinfo_query(result): continue if self._handle_warnings(result): continue if 'error' not in result: return result error = result['error'] for key in result: if key in ('error', 'warnings'): continue assert key not in error error[key] = result[key] if '*' in error: # help text returned error['help'] = error.pop('*') code = error.setdefault('code', 'Unknown') info = error.setdefault('info', None) if (code == self.last_error['code'] and info == self.last_error['info']): raise pywikibot.exceptions.APIError(**self.last_error) self.last_error = error if not self._logged_in(code): continue if code == 'maxlag': retries += 1 if retries > max(5, pywikibot.config.max_retries): break pywikibot.log('Pausing due to database lag: ' + info) try: lag = error['lag'] except KeyError: lag = lagpattern.search(info) lag = float(lag['lag']) if lag else 0.0 self.site.throttle.lag(lag * retries) # reset last error self.last_error = dict.fromkeys(['code', 'info']) continue if code == 'help' and self.action == 'help': # The help module returns an error result with the complete # API information. As this data was requested, return the # data instead of raising an exception. return {'help': {'mime': 'text/plain', 'help': error['help']}} pywikibot.warning(f'API error {code}: {info}') pywikibot.log(f' headers=\n{response.headers}') if self._internal_api_error(code, error.copy(), result): continue # Phab. tickets T48535, T64126, T68494, T68619 if code == 'failed-save' \ and self._is_wikibase_error_retryable(error): self.wait() continue if code == 'ratelimited': self._ratelimited() continue # If notloggedin or readapidenied is returned try to login if code in ('notloggedin', 'readapidenied') \ and self.site._loginstatus in (LoginStatus.NOT_ATTEMPTED, LoginStatus.NOT_LOGGED_IN): self.site.login() continue if self._bad_token(code): continue if 'mwoauth-invalid-authorization' in code: msg = f'OAuth authentication for {self.site}: {info}' if 'Nonce already used' in info: pywikibot.error(f'Retrying failed {msg}') continue raise NoUsernameError(f'Failed {msg}') if code == 'cirrussearch-too-busy-error': # T170647 self.wait() continue if code in ('search-title-disabled', 'search-text-disabled'): prefix = 'gsr' if 'gsrsearch' in self._params else 'sr' del self._params[prefix + 'what'] # use intitle: search instead if code == 'search-title-disabled' \ and self.site.has_extension('CirrusSearch'): key = prefix + 'search' self._params[key] = ['intitle:' + search for search in self._params[key]] continue if code == 'urlshortener-blocked': # T244062 # add additional information to error dict error['current site'] = self.site if self.site.user(): error['current user'] = self.site.user() else: # not logged in; show the IP uinfo = self.site.userinfo error['current user'] = uinfo['name'] # raise error try: param_repr = str(self._params) msg = (f'API Error: query=\n{pprint.pformat(param_repr)}\n' f' response=\n{result}') if test_running: from tests import unittest_print unittest_print(msg) else: pywikibot.log(msg) raise pywikibot.exceptions.APIError(**error) except TypeError: raise RuntimeError(result) msg = 'Maximum retries attempted due to maxlag without success.' if test_running: import unittest raise unittest.SkipTest(msg) raise MaxlagTimeoutError(msg)
[docs] class CachedRequest(Request): """Cached request. .. versionchanged:: 9.0 timestamp with timezone is used to determine expiry. """ def __init__(self, expiry, *args, **kwargs) -> None: """Initialize a CachedRequest object. :param expiry: either a number of days or a datetime.timedelta object """ assert expiry is not None super().__init__(*args, **kwargs) if not isinstance(expiry, datetime.timedelta): expiry = datetime.timedelta(expiry) self.expiry = min(expiry, datetime.timedelta(config.API_config_expiry)) self._data = None self._cachetime = None
[docs] @classmethod def create_simple(cls, req_site, **kwargs): """Unsupported as it requires at least two parameters.""" raise NotImplementedError('CachedRequest cannot be created simply.')
@classmethod def _get_cache_dir(cls) -> Path: """Return the base directory path for cache entries. The directory will be created if it does not already exist. .. versionchanged:: 8.0 return a `pathlib.Path` object. .. versionchanged:: 9.0 remove Python main version from directory name :return: base directory path for cache entries :meta public: """ path = Path(config.base_dir, 'apicache') cls._make_dir(path) cls._get_cache_dir = classmethod(lambda c: path) # cache the result return path
[docs] @staticmethod def _make_dir(dir_name: str | Path) -> Path: """Create directory if it does not exist already. .. versionchanged:: 7.0 Only `FileExistsError` is ignored but other OS exceptions can be still raised .. versionchanged:: 8.0 use *dir_name* as str or `pathlib.Path` object but always return a Path object. :param dir_name: directory path :return: directory path as `pathlib.Path` object for test purpose :meta public: """ if isinstance(dir_name, str): dir_name = Path(dir_name) dir_name.mkdir(exist_ok=True) return dir_name
def _uniquedescriptionstr(self) -> str: """Return unique description for the cache entry. If this is modified, please also update scripts/maintenance/cache.py to support the new key and all previous keys. """ login_status = self.site._loginstatus if login_status >= LoginStatus.AS_USER: # This uses the format of Page.__repr__, without performing # config.console_encoding as done by Page.__repr__. # The returned value can't be encoded to anything other than # ascii otherwise it creates an exception when _create_file_name() # tries to encode it as utf-8. user_key = f'User(User:{self.site.userinfo["name"]})' else: user_key = repr(LoginStatus(LoginStatus.NOT_LOGGED_IN)) request_key = repr(sorted(self._encoded_items().items())) return f'{self.site!r}{user_key}{request_key}' def _create_file_name(self) -> str: """Return a unique ascii identifier for the cache entry.""" return hashlib.sha256( self._uniquedescriptionstr().encode('utf-8') ).hexdigest()
[docs] def _cachefile_path(self) -> Path: """Create the cachefile path. .. versionchanged:: 8.0 return a `pathlib.Path` object. :meta public: """ return CachedRequest._get_cache_dir() / self._create_file_name()
def _expired(self, dt): """Check whether the timestamp is expired.""" return dt + self.expiry < pywikibot.Timestamp.nowutc() def _load_cache(self) -> bool: """Load cache entry for request, if available. :return: Whether the request was loaded from the cache """ self._add_defaults() try: filename = self._cachefile_path() with filename.open('rb') as f: uniquedescr, self._data, self._cachetime = pickle.load(f) if uniquedescr != self._uniquedescriptionstr(): raise RuntimeError('Expected unique description for the cache ' 'entry is different from file entry.') if self._expired(self._cachetime): self._data = None return False pywikibot.debug( f'{type(self).__name__}: cache ({filename.parent}) hit\n' f'{filename.name}, API request:\n{uniquedescr}') except OSError: pass # file not found except Exception as e: pywikibot.info(f'Could not load cache: {e!r}') else: return True return False def _write_cache(self, data) -> None: """Write data to self._cachefile_path().""" data = self._uniquedescriptionstr(), data, pywikibot.Timestamp.nowutc() path = self._cachefile_path() with suppress(OSError), path.open('wb') as f: pickle.dump(data, f, protocol=config.pickle_protocol) return # delete invalid cache entry path.unlink()
[docs] def submit(self): """Submit cached request.""" cached_available = self._load_cache() if not cached_available: self._data = super().submit() self._write_cache(self._data) else: self._handle_warnings(self._data) return self._data
[docs] def encode_url(query) -> str: """Encode parameters to pass with a url. Reorder parameters so that token parameters go last and call wraps :py:obj:`urlencode`. Return an HTTP URL query fragment which complies with :api:`Edit#Parameters` (See the 'token' bullet.) :param query: keys and values to be uncoded for passing with a url :type query: mapping object or a sequence of two-element tuples :return: encoded parameters with token parameters at the end """ if hasattr(query, 'items'): query = list(query.items()) # parameters ending on 'token' should go last # wpEditToken should go very last query.sort(key=lambda x: x[0].lower().endswith('token') + (x[0] == 'wpEditToken')) return urlencode(query)