"""Objects representing API upload to MediaWiki site."""
#
# (C) Pywikibot team, 2009-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations
import mimetypes
import os
from collections.abc import Iterable
from warnings import warn
import pywikibot
from pywikibot.exceptions import APIError, Error, UploadError
from pywikibot.tools import compute_file_hash
__all__ = ('Uploader', )
[docs]
class Uploader:
"""Uploader class to upload a file to the wiki.
.. versionadded:: 7.1
:param site: The current site to work on
:param filepage: a FilePage object from which the wiki-name of the
file will be obtained.
:param source_filename: path to the file to be uploaded
:param source_url: URL of the file to be uploaded
:param comment: Edit summary; if this is not provided, then
filepage.text will be used. An empty summary is not permitted.
This may also serve as the initial page text (see below).
:param text: Initial page text; if this is not set, then
filepage.text will be used, or comment.
:param watch: If true, add filepage to the bot user's watchlist
:param chunk_size: The chunk size in bytes for chunked uploading
(see :api:`Upload#Chunked_uploading`). It will only upload in
chunks, if the chunk size is positive but lower than the file
size.
:param asynchronous: Make potentially large file operations
asynchronous on the server side when possible.
:param ignore_warnings: It may be a static boolean, a callable
returning a boolean or an iterable. The callable gets a list of
UploadError instances and the iterable should contain the warning
codes for which an equivalent callable would return True if all
UploadError codes are in thet list. If the result is False it'll
not continue uploading the file and otherwise disable any warning
and reattempt to upload the file.
.. note:: If report_success is True or None it'll raise an
UploadError exception if the static boolean is False.
:type ignore_warnings: bool or callable or iterable of str
:param report_success: If the upload was successful it'll print a
success message and if ignore_warnings is set to False it'll
raise an UploadError if a warning occurred. If it's None
(default) it'll be True if ignore_warnings is a bool and False
otherwise. If it's True or None ignore_warnings must be a bool.
"""
upload_warnings = {
# map API warning codes to user error messages
# {msg} will be replaced by message string from API response
'duplicate-archive':
'The file is a duplicate of a deleted file {msg}.',
'was-deleted': 'The file {msg} was previously deleted.',
'empty-file': 'File {msg} is empty.',
'exists': 'File {msg} already exists.',
'duplicate': 'Uploaded file is a duplicate of {msg}.',
'badfilename': 'Target filename is invalid.',
'filetype-unwanted-type': 'File {msg} type is unwanted type.',
'exists-normalized':
'File exists with different extension as {msg!r}.',
'bad-prefix': 'Target filename has a bad prefix {msg}.',
'page-exists':
'Target filename exists but with a different file {msg}.',
# API-returned message string will be timestamps, not much use here
'no-change': 'The upload is an exact duplicate of the current version '
'of this file.',
'duplicate-version': 'The upload is an exact duplicate of older '
'version(s) of this file.',
}
def __init__(self,
site: pywikibot.site.APISite,
filepage: pywikibot.FilePage,
*,
source_filename: str | None = None,
source_url: str | None = None,
comment: str | None = None,
text: str | None = None,
watch: bool = False,
chunk_size: int = 0,
asynchronous: bool = False,
ignore_warnings=False,
report_success: bool | None = None) -> None:
"""Initializer."""
self.site = site
self.filepage = filepage
self.comment = comment
self.text = text
self.watch = watch
self.ignore_warnings = ignore_warnings
self.chunk_size = chunk_size
self.asynchronous = asynchronous
self.report_success = report_success
if source_filename and source_url:
raise ValueError('APISite.upload: must provide either '
'source_filename or source_url, not both.')
self.filename = source_filename
self.url = source_url
[docs]
def upload(self) -> bool:
"""Check for required parameters to upload and run the job.
:return: Whether the upload was successful.
"""
if self.comment is None:
self.comment = self.filepage.text
if not self.comment:
raise ValueError('APISite.upload: cannot upload file without '
'a summary/description.')
if self.text is None:
self.text = self.filepage.text
if not self.text:
self.text = self.comment
return self._upload(self.ignore_warnings, self.report_success)
[docs]
@classmethod
def create_warnings_list(cls, response, file_key):
"""Create a list of upload errors."""
return [UploadError(warning,
cls.upload_warnings.get(warning, '{msg}')
.format(msg=data),
file_key,
response['offset'])
for warning, data in response['warnings'].items()]
def _upload(self, ignore_warnings, report_success,
file_key=None, offset=0) -> bool:
"""Recursive Upload method.
:param file_key: Reuses an already uploaded file using the
filekey. If None (default) it will upload the file.
:param offset: When file_key is not None this can be an integer
to continue a previously canceled chunked upload. If False
it treats that as a finished upload. If True it requests the
stash info from the server to determine the offset. By
default starts at 0.
:return: Whether the upload was successful.
"""
# An offset != 0 doesn't make sense without a file key
assert offset == 0 or file_key is not None
if report_success is None:
report_success = isinstance(ignore_warnings, bool)
if report_success is True and not isinstance(ignore_warnings, bool):
raise ValueError('report_success may only be set to True when '
'ignore_warnings is a boolean')
if isinstance(ignore_warnings, Iterable):
ignored_warnings = ignore_warnings
def ignore_warnings(warnings):
return all(w.code in ignored_warnings for w in warnings)
ignore_all_warnings = not callable(ignore_warnings) and ignore_warnings
token = self.site.tokens['csrf']
result = None
file_page_title = self.filepage.title(
with_ns=False,
with_section=False,
)
file_size = None
# make sure file actually exists
if self.filename:
if os.path.isfile(self.filename):
file_size = os.path.getsize(self.filename)
elif offset is not False:
raise ValueError(f"File '{self.filename}' does not exist.")
# Verify the stash when a file key and offset is given:
# requests the SHA1 and file size uploaded and compares it to
# the local file. Also verify that offset is matching the
# file size if the offset is an int. If offset is False if
# verifies that the file size match with the local file.
verify_stash = False
if self.filename and file_key:
assert offset is False or file_size is not None
verify_stash = True
if (offset is not False and offset is not True
and offset > file_size):
raise ValueError(
f'For the file key "{file_key}" the offset was set to '
f'{offset} while the file is only {file_size} bytes large.'
)
if verify_stash or offset is True:
if not file_key:
raise ValueError('Without a file key it cannot request the '
'stash information')
if not self.filename:
raise ValueError('Can request stash information only when '
'using a file name.')
props = ['size']
if verify_stash:
props.append('sha1')
stash_info = self.site.stash_info(file_key, props)
if offset is True:
offset = stash_info['size']
elif offset is False:
if file_size != stash_info['size']:
raise ValueError(
'For the file key "{}" the server reported a size '
'{} while the file size is {}'
.format(file_key, stash_info['size'], file_size))
elif offset is not False and offset != stash_info['size']:
raise ValueError(
'For the file key "{}" the server reported a size {} '
'while the offset was {}'
.format(file_key, stash_info['size'], offset))
if verify_stash:
# The SHA1 was also requested so calculate and compare it
assert 'sha1' in stash_info, \
f'sha1 not in stash info: {stash_info}'
sha1 = compute_file_hash(self.filename, bytes_to_read=offset)
if sha1 != stash_info['sha1']:
raise ValueError(
'The SHA1 of {} bytes of the stashed "{}" is {} '
'while the local file is {}'
.format(offset, file_key, stash_info['sha1'], sha1))
assert offset is not True
if file_key and file_size is None:
assert offset is False
data = {}
if file_key and offset is False or offset == file_size:
pywikibot.log(
f'Reused already upload file using filekey "{file_key}"')
# TODO: Use sessionkey instead of filekey if necessary
final_request = self.site._request(
parameters={
'action': 'upload',
'token': token,
'filename': file_page_title,
'comment': self.comment,
'text': self.text,
'async': self.asynchronous,
'filekey': file_key
})
elif self.filename:
# TODO: Dummy value to allow also Unicode names, see bug T75661
mime_filename = 'FAKE-NAME'
# upload local file
throttle = True
filesize = os.path.getsize(self.filename)
chunked_upload = 0 < self.chunk_size < filesize
with open(self.filename, 'rb') as f:
final_request = self.site._request(
throttle=throttle, parameters={
'action': 'upload', 'token': token, 'text': self.text,
'filename': file_page_title, 'comment': self.comment})
if chunked_upload:
if offset > 0:
pywikibot.log(f'Continuing upload from byte {offset}')
poll = False
while True:
if poll:
# run a poll; not possible in first iteration
assert file_key
req = self.site.simple_request(
action='upload',
token=token,
filekey=file_key,
checkstatus=True)
else:
f.seek(offset)
chunk = f.read(self.chunk_size)
# workaround (hack) for T132676
# append another '\r' so that one is the payload
# and the second is used for newline when mangled
# by email package.
if (len(chunk) < self.chunk_size
or (offset + len(chunk)) == filesize
and chunk[-1] == b'\r'[0]):
chunk += b'\r'
mime_params = {
'chunk': (chunk,
('application', 'octet-stream'),
{'filename': mime_filename})
}
req = self.site._request(
throttle=throttle,
mime=mime_params,
parameters={
'action': 'upload',
'token': token,
'stash': True,
'filesize': filesize,
'offset': offset,
'filename': file_page_title,
'async': self.asynchronous,
'ignorewarnings': ignore_all_warnings})
if file_key:
req['filekey'] = file_key
try:
data = req.submit()['upload']
except APIError as error:
# TODO: catch and process foreseeable errors
if error.code == 'stashfailed' \
and 'offset' in error.other:
# TODO: Ask MediaWiki to change this
# ambiguous error code.
new_offset = int(error.other['offset'])
# If the offset returned from the server
# (the offset it expects now) is equal to
# the offset we sent it, there must be
# something else that prevented the upload,
# instead of simple offset mismatch. This
# also prevents infinite loops when we
# upload the same chunk again and again,
# every time ApiError.
if offset != new_offset:
pywikibot.log(
f'Old offset: {offset}; Returned '
f'offset: {new_offset}; Chunk size: '
f'{len(chunk)}'
)
pywikibot.warning('Attempting to correct '
'automatically from '
'offset mismatch error.')
offset = new_offset
continue
raise
if 'nochange' in data: # in simulation mode
break
# Polls may not contain file key in response
file_key = data.get('filekey', file_key)
if data['result'] == 'Warning':
assert ('warnings' in data
and not ignore_all_warnings)
if callable(ignore_warnings):
restart = False
if 'offset' not in data:
# This is a result of a warning in the
# first chunk. The chunk is not actually
# stashed so upload must be restarted if
# the warning is allowed.
# T112416 and T112405#1637544
restart = True
data['offset'] = True
if ignore_warnings(self.create_warnings_list(
data, file_key)):
# Future warnings of this run
# can be ignored
if restart:
return self._upload(
ignore_warnings=True,
report_success=False
)
ignore_warnings = True
ignore_all_warnings = True
offset = data['offset']
continue
return False
result = data
result.setdefault('offset', 0)
break
if data['result'] == 'Continue':
throttle = False
if 'offset' in data:
new_offset = int(data['offset'])
if offset + len(chunk) != new_offset:
pywikibot.log(
f'Old offset: {offset}; Returned '
f'offset: {new_offset}; Chunk size: '
f'{len(chunk)}'
)
pywikibot.warning('Unexpected offset.')
offset = new_offset
else:
pywikibot.warning('Offset was not supplied.')
offset += len(chunk)
elif data['result'] == 'Poll':
poll = True
pywikibot.log('Waiting for server to '
'assemble chunks.')
elif data['result'] == 'Success': # finished
pywikibot.log('Finished uploading last chunk.')
final_request['filekey'] = file_key
final_request['async'] = self.asynchronous
break
else:
raise Error('Unrecognized result: {result}'
.format_map(data))
elif file_key:
final_request['filekey'] = file_key
else:
file_contents = f.read()
filetype = (mimetypes.guess_type(self.filename)[0]
or 'application/octet-stream')
final_request.mime = {
'file': (file_contents, filetype.split('/'),
{'filename': mime_filename})
}
else:
# upload by URL
if not self.site.has_right('upload_by_url'):
raise Error(f"User '{self.site.user()}' is not authorized to "
f'upload by URL on site {self}.')
final_request = self.site.simple_request(
action='upload', filename=file_page_title, url=self.url,
comment=self.comment, text=self.text, token=token)
return self.submit(final_request, result, data.get('result'),
ignore_warnings, ignore_all_warnings,
report_success, file_key)
[docs]
def submit(self, request, result, data_result: str | None,
ignore_warnings, ignore_all_warnings, report_success,
file_key) -> bool:
"""Submit request and return whether upload was successful."""
# some warning keys have been changed
warning_keys = {
'nochange': 'no-change',
'duplicateversions': 'duplicate-version',
'emptyfile': 'empty-file',
}
token = request['token']
while True:
if not result:
request['watch'] = self.watch
request['ignorewarnings'] = ignore_all_warnings
result = request.submit()['upload']
pywikibot.debug(result)
if 'result' not in result:
raise Error(f'Upload: unrecognized response: {result}')
if result['result'] == 'Warning':
assert 'warnings' in result and not ignore_all_warnings
if self.filename:
if 'filekey' in result:
file_key = result['filekey']
elif 'sessionkey' in result:
# TODO: Probably needs to be reflected in the API call
# above
file_key = result['sessionkey']
pywikibot.warning(
'Using sessionkey instead of filekey.')
else:
file_key = None
pywikibot.warning('No filekey defined.')
else:
file_key = None
if not report_success:
result.setdefault('offset', bool(self.filename))
offset = result['offset'] if self.filename else False
if ignore_warnings(self.create_warnings_list(result,
file_key)):
return self._upload(ignore_warnings=True,
report_success=False,
file_key=file_key,
offset=offset)
return False
if len(result['warnings']) > 1:
warn('The upload returned {} warnings: {}'
.format(len(result['warnings']),
', '.join(result['warnings'])),
UserWarning, 3)
warning = list(result['warnings'].keys())[0]
message = result['warnings'][warning]
warning = warning_keys.get(warning, warning)
raise UploadError(warning,
self.upload_warnings[warning]
.format(msg=message),
file_key=file_key,
offset=result.get('offset', False))
if result['result'] == 'Poll':
# Polling is meaningless without a file key
assert file_key
pywikibot.log('Waiting for upload to be published.')
result = None
request = self.site.simple_request(
action='upload',
token=token,
filekey=file_key,
checkstatus=True)
continue
if result['result'] == 'Success':
if report_success:
pywikibot.info('Upload successful.')
# If we receive a nochange, that would mean we're in simulation
# mode, don't attempt to access imageinfo
if 'nochange' not in result:
self.filepage._load_file_revisions([result['imageinfo']])
return True
raise Error(
f"Unrecognized result: {data_result or result['result']}")