"""Objects representing API requests."""
#
# (C) Pywikibot team, 2007-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations
import datetime
import hashlib
import inspect
import os
import pickle
import pprint
import re
import sys
import traceback
from collections.abc import MutableMapping
from contextlib import suppress
from email.mime.nonmultipart import MIMENonMultipart
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlencode, urlparse
from warnings import warn
import pywikibot
from pywikibot import config
from pywikibot.backports import Callable, Match, removeprefix
from pywikibot.comms import http
from pywikibot.data import WaitingMixin
from pywikibot.exceptions import (
Client414Error,
Error,
FatalServerError,
MaxlagTimeoutError,
NoUsernameError,
Server504Error,
SiteDefinitionError,
)
from pywikibot.login import LoginStatus
from pywikibot.textlib import removeDisabledParts, removeHTMLParts
from pywikibot.tools import deprecated
__all__ = ('CachedRequest', 'Request', 'encode_url')
# Actions that imply database updates on the server, used for various
# things like throttling or skipping actions when we're in simulation
# mode
WRITE_ACTIONS = {
# main actions, see https://www.mediawiki.org/wiki/API:Main_page
'block', 'clearhasmsg', 'createaccount', 'createlocalaccount', 'delete',
'deleteglobalaccount', 'edit', 'editmassmessagelist', 'emailuser',
'filerevert', 'flowthank', 'globalblock', 'globalpreferenceoverrides',
'globalpreferences', 'globaluserrights', 'imagerotate', 'import',
'linkaccount', 'managetags', 'massmessage', 'mergehistory', 'move',
'newslettersubscribe', 'options', 'patrol', 'protect', 'purge',
'removeauthenticationdata', 'resetpassword', 'revisiondelete', 'rollback',
'setglobalaccountstatus', 'setnotificationtimestamp', 'setpagelanguage',
'strikevote', 'tag', 'thank', 'threadaction', 'transcodereset',
'translationreview', 'unblock', 'undelete', 'unlinkaccount', 'upload',
'userrights', 'watch', 'wikilove',
# wikibase actions, see https://www.mediawiki.org/wiki/Wikibase/API
'wbcreateclaim', 'wbcreateredirect', 'wbeditentity', 'wblinktitles',
'wbmergeitems', 'wbremoveclaims', 'wbremovequalifiers',
'wbremovereferences', 'wbsetaliases', 'wbsetclaim', 'wbsetclaimvalue',
'wbsetdescription', 'wbsetlabel', 'wbsetqualifier', 'wbsetreference',
'wbsetsitelink',
# lexeme (internal) actions
'wbladdform', 'wbladdsense', 'wbleditformelements', 'wbleditsenseelements',
'wblmergelexemes', 'wblremoveform', 'wblremovesense',
}
lagpattern = re.compile(
r'Waiting for [\w.: ]+: (?P<lag>\d+(?:\.\d+)?) seconds? lagged')
[docs]
class Request(MutableMapping, WaitingMixin):
"""A request to a Site's api.php interface.
Attributes of this object (except for the special parameters listed
below) get passed as commands to api.php, and can be get or set
using the dict interface. All attributes must be strings. Use an
empty string for parameters that don't require a value. For example,
``Request(action="query", titles="Foo bar", prop="info", redirects="")``
corresponds to the API request
``api.php?action=query&titles=Foo%20bar&prop=info&redirects``
This is the lowest-level interface to the API, and can be used for any
request that a particular site's API supports. See the API documentation
(https://www.mediawiki.org/wiki/API) and site-specific settings for
details on what parameters are accepted for each request type.
Uploading files is a special case: to upload, the parameter `mime` must
contain a dict, and the parameter `file` must be set equal to a valid
filename on the local computer, *not* to the content of the file.
Returns a dict containing the JSON data returned by the wiki. Normally,
one of the dict keys will be equal to the value of the 'action'
parameter. Errors are caught and raise an APIError exception.
Example:
>>> r = Request(parameters={'action': 'query', 'meta': 'userinfo'})
>>> # This is equivalent to
>>> # https://{path}/api.php?action=query&meta=userinfo&format=json
>>> # change a parameter
>>> r['meta'] = "userinfo|siteinfo"
>>> # add a new parameter
>>> r['siprop'] = "namespaces"
>>> # note that "uiprop" param gets added automatically
>>> r.action
'query'
>>> sorted(r._params)
['action', 'meta', 'siprop']
>>> r._params['action']
['query']
>>> r._params['meta']
['userinfo', 'siteinfo']
>>> r._params['siprop']
['namespaces']
>>> data = r.submit()
>>> isinstance(data, dict)
True
>>> set(['query', 'batchcomplete', 'warnings']).issuperset(data.keys())
True
>>> 'query' in data
True
>>> sorted(data['query'])
['namespaces', 'userinfo']
.. versionchanged:: 8.4
inherited from :class:`WaitingMixin`.
.. versionchanged:: 9.0
*keys* and *items* methods return a view object instead a list
"""
# To make sure the default value of 'parameters' can be identified.
_PARAM_DEFAULT = object()
def __init__(self, site=None,
mime: dict | None = None,
throttle: bool = True,
max_retries: int | None = None,
retry_wait: int | None = None,
use_get: bool | None = None,
parameters=_PARAM_DEFAULT, **kwargs) -> None:
"""Create a new Request instance with the given parameters.
The parameters for the request can be defined via either the
'parameters' parameter or the keyword arguments. The keyword arguments
were the previous implementation but could cause problems when there
are arguments to the API named the same as normal arguments to this
class. So the second parameter 'parameters' was added which just
contains all parameters. When a Request instance is created it must use
either one of them and not both at the same time. To have backwards
compatibility it adds a parameter named 'parameters' to kwargs when
both parameters are set as that indicates an old call and 'parameters'
was originally supplied as a keyword parameter.
If undefined keyword arguments were given AND the 'parameters'
parameter was supplied as a positional parameter it still assumes
'parameters' were part of the keyword arguments.
If a class is using Request and is directly forwarding the parameters,
:py:obj:`Request.clean_kwargs` can be used to automatically
convert the old kwargs mode into the new parameter mode. This
normalizes the arguments so that when the API parameters are
modified the changes can always be applied to the 'parameters'
parameter.
:param site: The Site to which the request will be submitted. If not
supplied, uses the user's configured default Site.
:param mime: If not None, send in "multipart/form-data" format (default
None). Parameters which should only be transferred via mime
mode are defined via this parameter (even an empty dict means
mime shall be used).
:param max_retries: Maximum number of times to retry after
errors, defaults to config.max_retries.
:param retry_wait: Minimum time in seconds to wait after an
error, defaults to config.retry_wait seconds (doubles each retry
until config.retry_max seconds is reached).
:param use_get: Use HTTP GET request if possible. If False it
uses a POST request. If None, it'll try to determine via
action=paraminfo if the action requires a POST.
:param parameters: The parameters used for the request to the API.
:type parameters: dict
:param kwargs: The parameters used for the request to the API.
"""
if site is None:
self.site = pywikibot.Site()
warn(f'Request() invoked without a site; setting to {self.site}',
RuntimeWarning, 2)
else:
self.site = site
self.mime = mime
if isinstance(mime, bool):
raise TypeError('mime param in api.Request() must not be boolean')
self.throttle = throttle
self.use_get = use_get
if max_retries is not None:
self.max_retries = max_retries
if retry_wait is not None:
self.retry_wait = retry_wait
self.json_warning = False
# The only problem with that system is that it won't detect when
# 'parameters' is actually the only parameter for the request as it
# then assumes it's using the new mode (and the parameters are actually
# in the parameter 'parameters' not that the parameter 'parameters' is
# actually a parameter for the request). But that is invalid anyway as
# it MUST have at least an action parameter for the request which would
# be in kwargs if it's using the old mode.
if kwargs:
if parameters is not self._PARAM_DEFAULT:
# 'parameters' AND kwargs is set. In that case think of
# 'parameters' being an old kwarg which is now filled in an
# actual parameter
self._warn_both()
kwargs['parameters'] = parameters
# When parameters wasn't set it's likely that kwargs-mode was used
self._warn_kwargs()
parameters = kwargs
elif parameters is self._PARAM_DEFAULT:
parameters = {}
self._params: dict[str, Any] = {}
if 'action' not in parameters:
raise ValueError("'action' specification missing from Request.")
self.action = parameters['action']
self.update(parameters) # also convert all parameter values to lists
self._warning_handler: Callable[[str, str], Match[str] | bool | None] | None = None # noqa: E501
self.write = self.action in WRITE_ACTIONS
# Client side verification that the request is being performed
# by a logged in user, and warn if it isn't a config username.
if self.write:
try:
username = self.site.userinfo['name']
except KeyError:
raise Error('API write action attempted without user name')
if 'anon' in self.site.userinfo:
raise Error(f'API write action attempted as IP {username!r}')
if not self.site.user() or self.site.username() != username:
pywikibot.warning(
f'API write action by unexpected username {username} '
f'commenced.\nuserinfo: {self.site.userinfo!r}')
# Make sure user is logged in
if self.write:
pywikibot.debug('Adding user assertion')
self['assert'] = 'user'
[docs]
@classmethod
def create_simple(cls, req_site, **kwargs):
"""Create a new instance using all args except site for the API."""
# This ONLY support site so that any caller can be sure there will be
# no conflict with PWB parameters
# req_site is needed to avoid conflicts with possible site keyword in
# kwarg until positional-only parameters are supported, see T262926
# TODO: Use ParamInfo request to determine valid parameters
if isinstance(kwargs.get('parameters'), dict):
warn('The request contains already a "parameters" entry which is '
'a dict.')
return cls(site=req_site, parameters=kwargs)
@classmethod
def _warn_both(cls) -> None:
"""Warn that kwargs mode was used but parameters was set too."""
warn('Both kwargs and parameters are set in Request.__init__. It '
'assumes that "parameters" is actually a parameter of the '
'Request and is added to kwargs.', DeprecationWarning, 3)
@classmethod
def _warn_kwargs(cls) -> None:
"""Warn that kwargs was used instead of parameters."""
warn('Instead of using kwargs from Request.__init__, parameters '
'for the request to the API should be added via the '
'"parameters" parameter.', DeprecationWarning, 3)
[docs]
@classmethod
def clean_kwargs(cls, kwargs: dict) -> dict:
"""Convert keyword arguments into new parameters mode.
If there are no other arguments in kwargs apart from the used arguments
by the class' initializer it'll just return kwargs and otherwise remove
those which aren't in the initializer and put them in a dict which is
added as a 'parameters' keyword. It will always create a shallow copy.
:param kwargs: The original keyword arguments which is not modified.
:return: The normalized keyword arguments.
"""
if 'expiry' in kwargs and kwargs['expiry'] is None:
del kwargs['expiry']
args = set()
for super_cls in inspect.getmro(cls):
if not super_cls.__name__.endswith('Request'):
break
args |= set(inspect.getfullargspec(super_cls.__init__).args)
else:
raise ValueError(f'Request was not a super class of {cls!r}')
args -= {'self'}
old_kwargs = set(kwargs)
# all kwargs defined above but not in args indicate 'kwargs' mode
if old_kwargs - args:
# Move all kwargs into parameters
parameters = {name: value for name, value in kwargs.items()
if name not in args or name == 'parameters'}
if 'parameters' in parameters:
cls._warn_both()
# Copy only arguments and not the parameters
kwargs = {name: value for name, value in kwargs.items()
if name in args or name == 'self'}
kwargs['parameters'] = parameters
# Make sure that all arguments have remained
assert (old_kwargs | {'parameters'}
== set(kwargs) | set(kwargs['parameters']))
assert (('parameters' in old_kwargs)
is ('parameters' in kwargs['parameters']))
cls._warn_kwargs()
else:
kwargs = dict(kwargs)
kwargs.setdefault('parameters', {})
return kwargs
def _format_value(self, value):
"""Format the MediaWiki API request parameter.
Converts from Python datatypes to MediaWiki API parameter values.
Supports:
* datetime.datetime (using strftime and ISO8601 format)
* pywikibot.page.BasePage (using title (+namespace; -section))
All other datatypes are converted to string.
"""
if isinstance(value, datetime.datetime):
return value.strftime(pywikibot.Timestamp.ISO8601Format)
if isinstance(value, pywikibot.page.BasePage):
if value.site != self.site:
raise RuntimeError(f'value.site {value.site!r} is different '
f'from Request.site {self.site!r}')
return value.title(with_section=False)
return str(value)
def __getitem__(self, key):
"""Implement dict interface."""
return self._params[key]
def __setitem__(self, key: str, value) -> None:
"""Set MediaWiki API request parameter.
:param value: param value(s)
:type value: str in site encoding
(string types may be a `|`-separated list)
iterable, where items are converted to string
with special handling for datetime.datetime to convert it to a
string using the ISO 8601 format accepted by the MediaWiki API.
"""
if isinstance(value, bytes):
value = value.decode(self.site.encoding())
if isinstance(value, str):
value = value.split('|')
if hasattr(value, 'api_iter'):
self._params[key] = value
else:
try:
iter(value)
except TypeError:
# convert any non-iterable value into a single-element list
self._params[key] = [value]
else:
self._params[key] = list(value)
def __delitem__(self, key) -> None:
"""Implement dict interface."""
del self._params[key]
def __iter__(self):
"""Implement dict interface."""
return iter(self._params)
def __len__(self) -> int:
"""Implement dict interface."""
return len(self._params)
[docs]
@deprecated('items()', since='9.0.0')
def iteritems(self):
"""Implement dict interface.
.. deprecated:: 9.0
Use ``items()`` instead.
"""
return iter(self.items())
def _add_defaults(self):
"""Add default parameters to the API request.
This method will only add them once.
"""
if hasattr(self, '__defaulted'):
return
if self.mime is not None and set(self._params) & set(self.mime):
raise ValueError('The mime and params shall not share the '
'same keys.')
if self.action == 'query':
meta = self._params.get('meta', [])
# Special logic for private wikis (T153903).
# If the wiki requires login privileges to read articles, pywikibot
# will be blocked from accessing the userinfo.
# Work around this by requiring userinfo only if 'tokens' and
# 'login' are not both set.
typep = self._params.get('type', [])
if not ('tokens' in meta and 'login' in typep):
if 'userinfo' not in meta:
meta = {*meta, 'userinfo'}
self['meta'] = sorted(meta)
uiprop = self._params.get('uiprop', [])
uiprop = {*uiprop, 'blockinfo', 'hasmsg'}
self['uiprop'] = sorted(uiprop)
if 'prop' in self._params \
and self.site.has_extension('ProofreadPage'):
prop = set(self['prop'] + ['proofread'])
self['prop'] = sorted(prop)
elif self.action == 'help':
self['wrap'] = ''
if config.maxlag:
self._params.setdefault('maxlag', [str(config.maxlag)])
self._params.setdefault('format', ['json'])
if self['format'] != ['json']:
raise TypeError(
f'Query format {self["format"]!r} cannot be parsed.')
self.__defaulted = True # skipcq: PTC-W0037
def _encoded_items(self) -> dict[str, str | bytes]:
"""Build a dict of params with minimal encoding needed for the site.
This helper method only prepares params for serialisation or
transmission, so it only encodes values which are not ASCII,
requiring callers to consider how to handle ASCII vs other values,
however the output is designed to enable __str__ and __repr__ to
do the right thing in most circumstances.
Servers which use an encoding that is not a superset of ASCII
are not supported.
:return: Parameters either in the site encoding, or ASCII strings
"""
params = {}
for key, values in self._params.items():
try:
iterator = values.api_iter()
except AttributeError:
if len(values) == 1:
value = values[0]
if value is True:
values = ['']
elif value is False or value is None:
# False and None are not included in the http URI
continue
iterator = iter(values)
value = '|'.join(self._format_value(value) for value in iterator)
# If the value is encodable as ascii, do not encode it.
# This means that any value which can be encoded as ascii
# is presumed to be ascii, and servers using a site encoding
# which is not a superset of ascii may be problematic.
try:
value.encode('ascii')
except UnicodeError:
try:
value = value.encode(self.site.encoding())
except Exception:
pywikibot.error(
f'_encoded_items: {key!r} could not be encoded as '
f'{self.site.encoding()!r}: {value!r}')
assert key.encode('ascii')
assert isinstance(key, str)
params[key] = value
return params
def _http_param_string(self):
"""Return the parameters as a HTTP URL query fragment.
URL encodes the parameters provided by _encoded_items()
.. note:: Not all parameters are sorted, therefore for two given
CachedRequest objects with equal _params, the result of
_http_param_string() is not necessarily equal.
"""
return encode_url(self._encoded_items())
def __str__(self) -> str:
"""Return a string representation."""
return unquote(self.site.scriptpath()
+ '/api.php?'
+ self._http_param_string())
def __repr__(self) -> str:
"""Return internal representation."""
cls = type(self)
return f"{cls.__module__}.{cls.__name__}<{self.site}->'{self}'>"
def _simulate(self, action):
"""Simulate action."""
if action and config.simulate and (
self.write or action in config.actions_to_block):
pywikibot.info(
f'<<black;yellow>>SIMULATION: {action} action blocked.')
# for more realistic simulation
if config.simulate is not True:
pywikibot.sleep(float(config.simulate))
return {
action: {'result': 'Success', 'nochange': ''},
# wikibase results
'entity': {'lastrevid': -1, 'id': '-1'},
'pageinfo': {'lastrevid': -1},
'reference': {'hash': -1},
}
return None
def _is_wikibase_error_retryable(self, error):
# dict of error message and current action.
# Value is True if action type is to be ignored
err_msg = {
'edit-already-exists': 'wbeditentity',
'actionthrottledtext': True, # T192912, T268645
}
messages = error.get('messages')
message = None
# bug T68619; after Wikibase breaking change 1ca9cee we have a
# list of messages
if isinstance(messages, list):
for item in messages:
message = item['name']
action = err_msg.get(message)
if action is True or action == self.action:
return True
return False
if isinstance(messages, dict):
try: # behaviour before gerrit 124323 breaking change
message = messages['0']['name']
except KeyError: # unsure the new output is always a list
message = messages['name']
action = err_msg.get(message)
return action is True or action == self.action
@staticmethod
def _generate_mime_part(key, content, keytype=None, headers=None):
if not keytype:
try:
content.encode('ascii')
keytype = ('text', 'plain')
except (UnicodeError, AttributeError):
keytype = ('application', 'octet-stream')
submsg = MIMENonMultipart(*keytype)
content_headers = {'name': key}
if headers:
content_headers.update(headers)
submsg.add_header('Content-disposition', 'form-data',
**content_headers)
if keytype != ('text', 'plain'):
submsg['Content-Transfer-Encoding'] = 'binary'
submsg.set_payload(content)
return submsg
def _use_get(self):
"""Verify whether 'get' is to be used."""
if (not config.enable_GET_without_SSL
and self.site.protocol() != 'https'
or self.site.is_oauth_token_available()): # T108182 workaround
use_get = False
elif self.use_get is None:
if self.action == 'query':
# for queries check the query module
modules = set()
for mod_type_name in ('list', 'prop', 'generator'):
modules.update(self._params.get(mod_type_name, []))
else:
modules = {self.action}
if modules:
self.site._paraminfo.fetch(modules)
use_get = all('mustbeposted' not in self.site._paraminfo[mod]
for mod in modules)
else:
# If modules is empty, just 'meta' was given, which doesn't
# require POSTs, and is required for ParamInfo
use_get = True
else:
use_get = self.use_get
return use_get
@classmethod
def _build_mime_request(cls, params: dict,
mime_params: dict) -> tuple[dict, bytes]:
"""Construct a MIME multipart form post.
:param params: HTTP request params
:param mime_params: HTTP request parts which must be sent in the body
:type mime_params: dict of (content, keytype, headers)
:return: HTTP request headers and body
"""
# construct a MIME message containing all API key/values
container = pywikibot.data.api.MIMEMultipart(_subtype='form-data')
for key, value in params.items():
submsg = cls._generate_mime_part(key, value)
container.attach(submsg)
for key, value in mime_params.items():
submsg = cls._generate_mime_part(key, *value)
container.attach(submsg)
# strip the headers to get the HTTP message body
body = container.as_bytes()
marker = b'\n\n' # separates headers from body
eoh = body.find(marker)
body = body[eoh + len(marker):]
# retrieve the headers from the MIME object
headers = dict(container.items())
return headers, body
def _get_request_params(self, use_get, paramstring):
"""Get request parameters."""
uri = self.site.apipath()
if self.mime is not None:
(headers, body) = Request._build_mime_request(
self._encoded_items(), self.mime)
use_get = False # MIME requests require HTTP POST
else:
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
if (not config.maximum_GET_length
or config.maximum_GET_length < len(paramstring)):
use_get = False
if use_get:
uri = f'{uri}?{paramstring}'
body = None
else:
body = paramstring
pywikibot.debug(f'API request to {self.site} (uses get: {use_get}):\n'
f'Headers: {headers!r}\nURI: {uri!r}\nBody: {body!r}')
return use_get, uri, body, headers
[docs]
def _http_request(self, use_get: bool, uri: str, data, headers,
paramstring) -> tuple:
"""Get or post a http request with exception handling.
.. versionchanged:: 8.2
change the scheme if the previous request didn't have json
content.
.. versionchanged:: 9.2
no wait cycles for :exc:`ImportError` and :exc:`NameError`.
:return: a tuple containing requests.Response object from
:func:`comms.http.request` and *use_get* value
:meta public:
"""
kwargs = {}
schemes = ('http', 'https')
if self.json_warning and self.site.protocol() in schemes:
# retry with other scheme
kwargs['protocol'] = schemes[self.site.protocol() == 'http']
try:
response = http.request(self.site, uri=uri,
method='GET' if use_get else 'POST',
data=data, headers=headers, **kwargs)
except Server504Error:
pywikibot.log('Caught HTTP 504 error; retrying')
except Client414Error:
if use_get:
pywikibot.log('Caught HTTP 414 error; retrying')
use_get = False
else:
pywikibot.warning(
'Caught HTTP 414 error, although not using GET.')
raise
except (ConnectionError, FatalServerError, NameError):
# These errors are not going to be fixed by just waiting
raise
except ImportError as e:
# Leave the script gracefully
pywikibot.error(e)
sys.exit(1)
# TODO: what other exceptions can occur here?
except Exception:
# for any other error on the http request, wait and retry
pywikibot.error(traceback.format_exc())
pywikibot.log(f'{uri}, {paramstring}')
else:
return response, use_get
self.wait()
return None, use_get
[docs]
def _json_loads(self, response) -> dict | None:
"""Return a dict from requests.Response.
.. versionchanged:: 8.2
show a warning to add a ``protocol()`` method to the family
file if suitable.
:param response: a requests.Response object
:type response: requests.Response
:return: a data dict
:raises pywikibot.exceptions.APIError: unknown action found
:raises pywikibot.exceptions.APIError: unknown query result type
:meta public:
"""
try:
result = response.json()
except ValueError:
# if the result isn't valid JSON, there may be a server problem.
# Wait a few seconds and try again.
# Show 20 lines of bare text without script parts
text = removeDisabledParts(response.text, ['script'])
text = re.sub('\n{2,}', '\n',
'\n'.join(removeHTMLParts(text).splitlines()[:20]))
msg = f"""\
Non-JSON response received from server {self.site} for url
{response.url}
The server may be down.
Status code: {response.status_code}
The text message is:
{text}
"""
# Do not retry for AutoFamily but raise a SiteDefinitionError
# Note: family.AutoFamily is a function to create that class
if self.site.family.__class__.__name__ == 'AutoFamily':
pywikibot.debug(msg)
raise SiteDefinitionError(
f'Invalid AutoFamily({self.site.family.domain!r})')
if not self.json_warning: # warn only once
pywikibot.warning(msg)
self.json_warning = True
# there might also be an overflow, so try a smaller limit
for param in self._params:
if param.endswith('limit'):
# param values are stored a list of str
value = self[param][0]
if value.isdigit():
self[param] = [str(int(value) // 2)]
pywikibot.info(f'Set {param} = {self[param]}')
else:
scheme = urlparse(response.url).scheme
if self.json_warning and scheme != self.site.protocol():
warn(f"""
Your {self.site.family} family uses a wrong scheme {self.site.protocol()!r}
but {scheme!r} is required. Please add the following code to your family file:
def protocol(self, code: str) -> str:
return '{scheme}'
""", stacklevel=2)
return result or {}
self.wait()
return None
def _relogin(self, message: str = '') -> None:
"""Force re-login and inform user."""
message += ' Forcing re-login.'
pywikibot.error(f'{message.strip()}')
self.site._relogin()
def _userinfo_query(self, result) -> bool:
"""Handle userinfo query."""
if self.action == 'query' and 'userinfo' in result.get('query', ()):
# if we get passed userinfo in the query result, we can confirm
# that we are logged in as the correct user. If this is not the
# case, force a re-login.
username = result['query']['userinfo']['name']
if (self.site.user() is not None and self.site.user() != username
and self.site._loginstatus != LoginStatus.IN_PROGRESS):
self._relogin(f'Logged in as {username!r} instead of '
f'{self.site.user()!r}.')
return True
return False
[docs]
def _handle_warnings(self, result: dict[str, Any]) -> bool:
"""Handle warnings; return True to retry request, False to resume.
.. versionchanged:: 7.2
Return True to retry the current request and False to resume.
:meta public:
"""
retry = False
if 'warnings' not in result:
return retry
for mod, warning in result['warnings'].items():
if mod == 'info':
continue
if '*' in warning:
text = warning['*']
elif 'html' in warning:
# bug T51978
text = warning['html']['*']
else:
pywikibot.warning(
f'API warning ({mod}) of unknown format: {warning}')
continue
# multiple warnings are in text separated by a newline
for single_warning in text.splitlines():
if (not callable(self._warning_handler)
or not self._warning_handler(mod, single_warning)):
handled = self._default_warning_handler(mod,
single_warning)
if handled is None:
pywikibot.warning(
f'API warning ({mod}): {single_warning}')
else:
retry = retry or handled
return retry
[docs]
def _default_warning_handler(self, mode: str, msg: str) -> bool | None:
"""A default warning handler to handle specific warnings.
Return True to retry the request, False to resume and None if
the warning is not handled.
.. versionadded:: 7.2
:meta public:
"""
warnings = {
'purge': ("You've exceeded your rate limit. "
'Please wait some time and try again.',
'_ratelimited', True),
}
warning, handler, retry = warnings.get(mode, (None, None, None))
if handler and msg == warning:
# Only show the first warning part
pywikibot.warning(msg.split('.')[0] + '.')
# call the handler
getattr(self, handler)()
return retry
return None
def _logged_in(self, code) -> bool:
"""Check whether user is logged in.
Older wikis returned an error instead of a warning when the request
asked for too many values. If we get this error, assume we are not
logged in (we can't check this because the userinfo data is not
present) and force a re-login
"""
if code.endswith('limit'):
message = 'Received API limit error.'
# If the user assertion failed, we're probably logged out as well.
elif code == 'assertuserfailed':
message = 'User assertion failed.'
# Lastly, the purge module requires a POST if used as anonymous user,
# but we normally send a GET request. If the API tells us the request
# has to be POSTed, we're probably logged out.
elif code == 'mustbeposted' and self.action == 'purge':
message = "Received unexpected 'mustbeposted' error."
else:
return True
self._relogin(message)
return False
def _internal_api_error(self, code, error, result) -> bool:
"""Check for ``internal_api_error_`` or readonly and retry.
:raises pywikibot.exceptions.APIMWError: internal_api_error or
readonly
:meta public:
"""
iae = 'internal_api_error_'
if not (code.startswith(iae) or code == 'readonly'):
return False
# T154011
class_name = code if code == 'readonly' else removeprefix(code, iae)
del error['code'] # is added via class_name
e = pywikibot.exceptions.APIMWError(class_name, **error)
# If the error key is in this table, it is probably a temporary
# problem, so we will retry the edit.
# TODO: T154011: 'ReadOnlyError' seems replaced by 'readonly'
retry = class_name in [
'DBConnectionError', # T64974
'DBQueryError', # T60158
'DBQueryTimeoutError', # T297708
'DBUnexpectedError', # T360930
'ReadOnlyError', # T61227
'readonly', # T154011
]
pywikibot.error('Detected MediaWiki API exception {}{}'
.format(e, '; retrying' if retry else '; raising'))
param_repr = str(self._params)
pywikibot.log(f'MediaWiki exception {class_name} details:\n'
f' query=\n{pprint.pformat(param_repr)}\n'
f' response=\n{result}')
if not retry:
raise e
self.wait()
return True
def _ratelimited(self) -> None:
"""Handle ratelimited warning.
This is also called from :meth:`_default_warning_handler`.
"""
self.wait(self.site.ratelimit(self.action).delay)
def _bad_token(self, code) -> bool:
"""Check for bad token.
Check for bad tokens, call :meth:`TokenWallet.update_tokens()
<pywikibot.site._tokenwallet.TokenWallet.update_tokens>` method
to update the bunch of tokens and continue loop in :meth:`submit`.
"""
if code != 'badtoken': # Other code not handled here
return False
if self.site._loginstatus == LoginStatus.IN_PROGRESS:
pywikibot.log(f'Login status: {self.site._loginstatus.name}')
return False
# invalidate superior wiki cookies (T224712)
pywikibot.data.api._invalidate_superior_cookies(self.site.family)
# update tokens
tokens = self.site.tokens.update_tokens(self._params['token'])
self._params['token'] = tokens
return True
[docs]
def wait(self, delay: int | None = None) -> None:
"""Determine how long to wait after a failed request.
Also reset last API error with wait cycles.
.. versionadded:: 9.0
:param delay: Minimum time in seconds to wait. Overwrites
``retry_wait`` variable if given. The delay doubles each
retry until ``retry_max`` seconds is reached.
"""
self.last_error = dict.fromkeys(['code', 'info'])
super().wait(delay)
[docs]
def submit(self) -> dict:
"""Submit a query and parse the response.
.. versionchanged:: 8.0.4
in addition to *readapidenied* also try to login when API
response is *notloggedin*.
.. versionchanged:: 9.0
Raise :exc:`exceptions.APIError` if the same error comes
twice in a row within the loop.
:return: a dict containing data retrieved from api.php
"""
test_running = os.environ.get('PYWIKIBOT_TEST_RUNNING', '0') == '1'
self._add_defaults()
use_get = self._use_get()
retries = 0
self.last_error = dict.fromkeys(['code', 'info'])
while True:
paramstring = self._http_param_string()
simulate = self._simulate(self.action)
if simulate:
return simulate
if self.throttle:
self.site.throttle(write=self.write)
else:
pywikibot.log(
f"Submitting unthrottled action '{self.action}'.")
use_get, uri, body, headers = self._get_request_params(use_get,
paramstring)
response, use_get = self._http_request(use_get, uri, body, headers,
paramstring)
if response is None:
continue
result = self._json_loads(response)
if result is None:
continue
if self._userinfo_query(result):
continue
if self._handle_warnings(result):
continue
if 'error' not in result:
return result
error = result['error']
for key in result:
if key in ('error', 'warnings'):
continue
assert key not in error
error[key] = result[key]
if '*' in error:
# help text returned
error['help'] = error.pop('*')
code = error.setdefault('code', 'Unknown')
info = error.setdefault('info', None)
if (code == self.last_error['code']
and info == self.last_error['info']):
raise pywikibot.exceptions.APIError(**self.last_error)
self.last_error = error
if not self._logged_in(code):
continue
if code == 'maxlag':
retries += 1
if retries > max(5, pywikibot.config.max_retries):
break
pywikibot.log('Pausing due to database lag: ' + info)
try:
lag = error['lag']
except KeyError:
lag = lagpattern.search(info)
lag = float(lag['lag']) if lag else 0.0
self.site.throttle.lag(lag * retries)
# reset last error
self.last_error = dict.fromkeys(['code', 'info'])
continue
if code == 'help' and self.action == 'help':
# The help module returns an error result with the complete
# API information. As this data was requested, return the
# data instead of raising an exception.
return {'help': {'mime': 'text/plain',
'help': error['help']}}
pywikibot.warning(f'API error {code}: {info}')
pywikibot.log(f' headers=\n{response.headers}')
if self._internal_api_error(code, error.copy(), result):
continue
# Phab. tickets T48535, T64126, T68494, T68619
if code == 'failed-save' \
and self._is_wikibase_error_retryable(error):
self.wait()
continue
if code == 'ratelimited':
self._ratelimited()
continue
# If notloggedin or readapidenied is returned try to login
if code in ('notloggedin', 'readapidenied') \
and self.site._loginstatus in (LoginStatus.NOT_ATTEMPTED,
LoginStatus.NOT_LOGGED_IN):
self.site.login()
continue
if self._bad_token(code):
continue
if 'mwoauth-invalid-authorization' in code:
msg = f'OAuth authentication for {self.site}: {info}'
if 'Nonce already used' in info:
pywikibot.error(f'Retrying failed {msg}')
continue
raise NoUsernameError(f'Failed {msg}')
if code == 'cirrussearch-too-busy-error': # T170647
self.wait()
continue
if code in ('search-title-disabled', 'search-text-disabled'):
prefix = 'gsr' if 'gsrsearch' in self._params else 'sr'
del self._params[prefix + 'what']
# use intitle: search instead
if code == 'search-title-disabled' \
and self.site.has_extension('CirrusSearch'):
key = prefix + 'search'
self._params[key] = ['intitle:' + search
for search in self._params[key]]
continue
if code == 'urlshortener-blocked': # T244062
# add additional information to error dict
error['current site'] = self.site
if self.site.user():
error['current user'] = self.site.user()
else: # not logged in; show the IP
uinfo = self.site.userinfo
error['current user'] = uinfo['name']
# raise error
try:
param_repr = str(self._params)
msg = (f'API Error: query=\n{pprint.pformat(param_repr)}\n'
f' response=\n{result}')
if test_running:
from tests import unittest_print
unittest_print(msg)
else:
pywikibot.log(msg)
raise pywikibot.exceptions.APIError(**error)
except TypeError:
raise RuntimeError(result)
msg = 'Maximum retries attempted due to maxlag without success.'
if test_running:
import unittest
raise unittest.SkipTest(msg)
raise MaxlagTimeoutError(msg)
[docs]
class CachedRequest(Request):
"""Cached request.
.. versionchanged:: 9.0
timestamp with timezone is used to determine expiry.
"""
def __init__(self, expiry, *args, **kwargs) -> None:
"""Initialize a CachedRequest object.
:param expiry: either a number of days or a datetime.timedelta object
"""
assert expiry is not None
super().__init__(*args, **kwargs)
if not isinstance(expiry, datetime.timedelta):
expiry = datetime.timedelta(expiry)
self.expiry = min(expiry, datetime.timedelta(config.API_config_expiry))
self._data = None
self._cachetime = None
[docs]
@classmethod
def create_simple(cls, req_site, **kwargs):
"""Unsupported as it requires at least two parameters."""
raise NotImplementedError('CachedRequest cannot be created simply.')
@classmethod
def _get_cache_dir(cls) -> Path:
"""Return the base directory path for cache entries.
The directory will be created if it does not already exist.
.. versionchanged:: 8.0
return a `pathlib.Path` object.
.. versionchanged:: 9.0
remove Python main version from directory name
:return: base directory path for cache entries
:meta public:
"""
path = Path(config.base_dir, 'apicache')
cls._make_dir(path)
cls._get_cache_dir = classmethod(lambda c: path) # cache the result
return path
[docs]
@staticmethod
def _make_dir(dir_name: str | Path) -> Path:
"""Create directory if it does not exist already.
.. versionchanged:: 7.0
Only `FileExistsError` is ignored but other OS exceptions can
be still raised
.. versionchanged:: 8.0
use *dir_name* as str or `pathlib.Path` object but always
return a Path object.
:param dir_name: directory path
:return: directory path as `pathlib.Path` object for test purpose
:meta public:
"""
if isinstance(dir_name, str):
dir_name = Path(dir_name)
dir_name.mkdir(exist_ok=True)
return dir_name
def _uniquedescriptionstr(self) -> str:
"""Return unique description for the cache entry.
If this is modified, please also update
scripts/maintenance/cache.py to support
the new key and all previous keys.
"""
login_status = self.site._loginstatus
if login_status >= LoginStatus.AS_USER:
# This uses the format of Page.__repr__, without performing
# config.console_encoding as done by Page.__repr__.
# The returned value can't be encoded to anything other than
# ascii otherwise it creates an exception when _create_file_name()
# tries to encode it as utf-8.
user_key = f'User(User:{self.site.userinfo["name"]})'
else:
user_key = repr(LoginStatus(LoginStatus.NOT_LOGGED_IN))
request_key = repr(sorted(self._encoded_items().items()))
return f'{self.site!r}{user_key}{request_key}'
def _create_file_name(self) -> str:
"""Return a unique ascii identifier for the cache entry."""
return hashlib.sha256(
self._uniquedescriptionstr().encode('utf-8')
).hexdigest()
[docs]
def _cachefile_path(self) -> Path:
"""Create the cachefile path.
.. versionchanged:: 8.0
return a `pathlib.Path` object.
:meta public:
"""
return CachedRequest._get_cache_dir() / self._create_file_name()
def _expired(self, dt):
"""Check whether the timestamp is expired."""
return dt + self.expiry < pywikibot.Timestamp.nowutc()
def _load_cache(self) -> bool:
"""Load cache entry for request, if available.
:return: Whether the request was loaded from the cache
"""
self._add_defaults()
try:
filename = self._cachefile_path()
with filename.open('rb') as f:
uniquedescr, self._data, self._cachetime = pickle.load(f)
if uniquedescr != self._uniquedescriptionstr():
raise RuntimeError('Expected unique description for the cache '
'entry is different from file entry.')
if self._expired(self._cachetime):
self._data = None
return False
pywikibot.debug(
f'{type(self).__name__}: cache ({filename.parent}) hit\n'
f'{filename.name}, API request:\n{uniquedescr}')
except OSError:
pass # file not found
except Exception as e:
pywikibot.info(f'Could not load cache: {e!r}')
else:
return True
return False
def _write_cache(self, data) -> None:
"""Write data to self._cachefile_path()."""
data = self._uniquedescriptionstr(), data, pywikibot.Timestamp.nowutc()
path = self._cachefile_path()
with suppress(OSError), path.open('wb') as f:
pickle.dump(data, f, protocol=config.pickle_protocol)
return
# delete invalid cache entry
path.unlink()
[docs]
def submit(self):
"""Submit cached request."""
cached_available = self._load_cache()
if not cached_available:
self._data = super().submit()
self._write_cache(self._data)
else:
self._handle_warnings(self._data)
return self._data
[docs]
def encode_url(query) -> str:
"""Encode parameters to pass with a url.
Reorder parameters so that token parameters go last and call wraps
:py:obj:`urlencode`. Return an HTTP URL query fragment which
complies with :api:`Edit#Parameters` (See the 'token' bullet.)
:param query: keys and values to be uncoded for passing with a url
:type query: mapping object or a sequence of two-element tuples
:return: encoded parameters with token parameters at the end
"""
if hasattr(query, 'items'):
query = list(query.items())
# parameters ending on 'token' should go last
# wpEditToken should go very last
query.sort(key=lambda x: x[0].lower().endswith('token')
+ (x[0] == 'wpEditToken'))
return urlencode(query)