"""Special bot library containing UploadRobot.
Do not import classes directly from here but from specialbots.
"""
#
# (C) Pywikibot team, 2003-2024
#
# Distributed under the terms of the MIT license.
#
from __future__ import annotations
import os
import tempfile
from contextlib import suppress
from http import HTTPStatus
from pathlib import Path
from urllib.parse import urlparse
import requests
import pywikibot
from pywikibot import config
from pywikibot.backports import Callable
from pywikibot.bot import BaseBot, QuitKeyboardInterrupt
from pywikibot.comms import http
from pywikibot.exceptions import APIError, FatalServerError, NoPageError
[docs]
class UploadRobot(BaseBot):
"""Upload bot."""
post_processor: Callable[[str, str | None], None] | None = None
"""If this attribute is set to a callable, the :meth:`run` method
calls it after upload. The parameters passed to the callable is the
origin *file_url* passed to the :meth:`upload` method and the
*filename* returned from that method. It can be used like this:
.. code-block:: python
def summarize(old: str, new: str | None) -> None:
if new is None:
print(f'{old} was ignored')
else:
print(f'{old} was uploaded as {new}')
bot = UploadRobot('Myfile.bmp')
bot.post_processor = summarize
bot.run()
.. versionadded:: 9.1
"""
def __init__(self, url: list[str] | str, *,
url_encoding=None,
description: str = '',
use_filename=None,
keep_filename: bool = False,
verify_description: bool = True,
ignore_warning: bool | list = False,
target_site=None,
aborts: bool | list | None = None,
chunk_size: int = 0,
asynchronous: bool = False,
summary: str | None = None,
filename_prefix: str | None = None,
force_if_shared: bool = False,
**kwargs) -> None:
"""Initializer.
.. versionchanged:: 6.2
asynchronous upload is used if *asynchronous* parameter is set
.. versionchanged:: 6.4
*force_if_shared* parameter was added
:param url: path to url or local file, or list of urls or paths
to local files.
:param description: Description of file for its page. If multiple files
are uploading the same description is used for every file.
:param use_filename: Specify title of the file's page. If multiple
files are uploading it asks to change the name for second, third,
etc. files, otherwise the last file will overwrite the other.
:param keep_filename: Set to True to keep original names of urls and
files, otherwise it will ask to enter a name for each file.
:param summary: Summary of the upload
:param verify_description: Set to True to proofread the description.
:param ignore_warning: Set this to True to upload even if another file
would be overwritten or another mistake would be risked. Set it to
an array of warning codes to selectively ignore specific warnings.
:param target_site: Set the site to upload to. If target site is not
given it's taken from user config file (user-config.py).
:type target_site: object
:param aborts: List of the warning types to abort upload on. Set to
True to abort on any warning.
:param chunk_size: Upload the file in chunks (more overhead, but
restartable) specified in bytes. If no value is specified the file
will be uploaded as whole.
:param asynchronous: Make potentially large file operations
asynchronous on the server side when possible.
:param filename_prefix: Specify prefix for the title of every
file's page.
:param force_if_shared: Upload the file even if it's currently
shared to the target site (e.g. when moving from Commons to another
wiki)
:keyword bool always: Disables any input, requires that either
ignore_warning or aborts are set to True and that the
description is also set. It overwrites verify_description to
False and keep_filename to True.
"""
super().__init__(**kwargs)
if self.opt.always:
if ignore_warning is not True and aborts is not True:
raise ValueError(
'When always is set to True, '
'ignore_warning or aborts must be set to True.')
if not description:
raise ValueError(
'When always is set to True, the description must be set.')
self.url = [url] if isinstance(url, str) else url
self.url_encoding = url_encoding
self.description = description
self.use_filename = use_filename
self.keep_filename = keep_filename or self.opt.always
self.verify_description = verify_description and not self.opt.always
self.ignore_warning = ignore_warning
self.aborts = aborts or []
self.chunk_size = chunk_size
self.asynchronous = asynchronous
self.summary = summary
self.filename_prefix = filename_prefix
self.force_if_shared = force_if_shared
if config.upload_to_commons:
default_site = pywikibot.Site('commons')
else:
default_site = pywikibot.Site()
self.target_site = target_site or default_site
[docs]
def read_file_content(self, file_url: str):
"""Return name of temp file in which remote file is saved."""
pywikibot.info('Reading file ' + file_url)
handle, tempname = tempfile.mkstemp()
path = Path(tempname)
size = 0
dt_gen = (el for el in (15, 30, 45, 60, 120, 180, 240, 300))
while True:
file_len = path.stat().st_size
if file_len:
pywikibot.info('Download resumed.')
headers = {'Range': f'bytes={file_len}-'}
else:
headers = {}
with open(path, 'ab') as fd:
os.lseek(handle, file_len, 0)
try:
response = http.fetch(file_url, stream=True,
headers=headers)
response.raise_for_status()
# get download info, if available
# Note: this is not enough to exclude pages
# e.g. 'application/json' is also not a media
if 'text/' in response.headers['Content-Type']:
raise FatalServerError('The requested URL was not '
'found on server.')
size = max(size,
int(response.headers.get('Content-Length', 0)))
# stream content to temp file (in chunks of 1Mb)
for chunk in response.iter_content(chunk_size=1024 * 1024):
fd.write(chunk)
# raised from connection lost during response.iter_content()
except requests.ConnectionError:
fd.flush()
pywikibot.info(
f'Connection closed at byte {path.stat().st_size}')
# raised from response.raise_for_status()
except requests.HTTPError as e:
# exit criteria if size is not available
# error on last iteration is OK, we're requesting
# {'Range': 'bytes=file_len-'}
err = HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE
if response.status_code == err and path.stat().st_size:
break
raise FatalServerError(str(e)) from e
if size and size == path.stat().st_size:
break
try:
dt = next(dt_gen)
pywikibot.info(f'Sleeping for {dt} seconds ...')
pywikibot.sleep(dt)
except StopIteration:
raise FatalServerError('Download failed, too many retries!')
pywikibot.info(f'Downloaded {path.stat().st_size} bytes')
return tempname
def _handle_warning(self, warning: str) -> bool | None:
"""Return whether the warning cause an abort or be ignored.
:param warning: The warning name
:return: False if this warning should cause an abort, True if it should
be ignored or None if this warning has no default handler.
"""
if self.aborts is not True and warning in self.aborts:
return False
if self.ignore_warning is True or (self.ignore_warning is not False
and warning in self.ignore_warning):
return True
return None if self.aborts is not True else False
def _handle_warnings(self, warnings):
messages = '\n'.join(f'{warning.code}: {warning.info}'
for warning in sorted(warnings,
key=lambda w: w.code))
if len(warnings) > 1:
messages = '\n' + messages
pywikibot.info('We got the following warning(s): ' + messages)
answer = True
for warning in warnings:
this_answer = self._handle_warning(warning.code)
if this_answer is False:
answer = False
break
if this_answer is None:
answer = None
if answer is None:
answer = pywikibot.input_yn('Do you want to ignore?',
default=False, automatic_quit=False)
return answer
[docs]
def process_filename(self, file_url: str) -> str | None:
"""Return base filename portion of *file_url*.
:param file_url: either a URL or a local file path
"""
# Isolate the pure name
filename = file_url
if '://' in filename:
# extract the path portion of the URL
filename = urlparse(filename).path
filename = os.path.basename(filename)
if self.use_filename:
filename = self.use_filename
if self.filename_prefix:
filename = self.filename_prefix + filename
if not self.keep_filename:
pywikibot.info(f'\nThe filename on the target wiki will default '
f'to: {filename}\n')
assert not self.opt.always
newfn = pywikibot.input(
'Enter a better name, or press enter to accept:')
if newfn != '':
filename = newfn
# FIXME: these 2 belong somewhere else, presumably in family
# forbidden characters are handled by pywikibot/page.py
forbidden = ':*/\\' # to be extended
allowed_formats = self.target_site.file_extensions
# ask until it's valid
first_check = True
while True:
if not first_check:
if self.opt.always:
filename = None
else:
filename = pywikibot.input('Enter a better name, or press '
'enter to skip the file:')
if not filename:
return None
first_check = False
ext = os.path.splitext(filename)[1].lower().strip('.')
# are any chars in forbidden also in filename?
invalid = set(forbidden) & set(filename)
if invalid:
c = ''.join(invalid)
pywikibot.info(
f'Invalid character(s): {c}. Please try again')
continue
if allowed_formats and ext not in allowed_formats:
if self.opt.always:
pywikibot.info('File format is not one of [{}]'
.format(' '.join(allowed_formats)))
elif pywikibot.input_yn(
'File format is not one of [{}], but {!r}. Skip?'
.format(' '.join(allowed_formats), ext)):
return None
continue
potential_file_page = pywikibot.FilePage(self.target_site,
filename)
if potential_file_page.exists():
overwrite = self._handle_warning('exists')
if overwrite is False:
pywikibot.info(
'File exists and you asked to abort. Skipping.')
return None
if potential_file_page.has_permission():
if overwrite is None:
overwrite = not pywikibot.input_yn(
f'File with name {filename} already exists.'
' Would you like to change the name?'
' (Otherwise file will be overwritten.)',
default=True,
automatic_quit=False)
if not overwrite:
continue
break
pywikibot.info(f'File with name {filename} already exists and '
f'cannot be overwritten.')
continue
with suppress(NoPageError):
if (not self.force_if_shared
and potential_file_page.file_is_shared()):
pywikibot.info(
f'File with name {filename} already exists in shared '
f'repository and cannot be overwritten.')
continue
break
# A proper description for the submission.
# Empty descriptions are not accepted.
if self.description:
pywikibot.info(
f'The suggested description is:\n{self.description}')
while not self.description or self.verify_description:
if not self.description:
pywikibot.info('<<lightred>>It is not possible to upload a '
'file without a description.')
assert not self.opt.always
# if no description, ask if user want to add one or quit,
# and loop until one is filled.
# if self.verify_description, ask if user want to change it
# or continue.
if self.description:
question = 'Do you want to change this description?'
else:
question = 'No description was given. Add one?'
if pywikibot.input_yn(question, default=not self.description,
automatic_quit=self.description):
from pywikibot import editor as editarticle
editor = editarticle.TextEditor()
try:
new_description = editor.edit(self.description)
except ImportError:
raise
except Exception as e:
pywikibot.error(e)
continue
# if user saved / didn't press Cancel
if new_description:
self.description = new_description
elif not self.description:
raise QuitKeyboardInterrupt
self.verify_description = False
return filename
[docs]
def abort_on_warn(self, warn_code):
"""Determine if the warning message should cause an abort."""
return self.aborts is True or warn_code in self.aborts
[docs]
def ignore_on_warn(self, warn_code: str):
"""Determine if the warning message should be ignored.
:param warn_code: The warning message
"""
return self.ignore_warning is True or warn_code in self.ignore_warning
[docs]
def upload_file(self, file_url: str) -> str | None:
"""Upload the image at file_url to the target wiki.
.. seealso:: :api:`Upload`
Return the filename that was used to upload the image.
If the upload fails, ask the user whether to try again or not.
If the user chooses not to retry, return None.
.. versionchanged:: 7.0
If 'copyuploadbaddomain' API error occurred in first step,
download the file and upload it afterwards
:param file_url: either a URL or a local file path
"""
filename = self.process_filename(file_url)
if not filename:
return None
site = self.target_site
imagepage = pywikibot.FilePage(site, filename) # normalizes filename
imagepage.text = self.description
pywikibot.info(f'Uploading file to {site}...')
ignore_warnings = self.ignore_warning is True or self._handle_warnings
download = False
while True:
if '://' in file_url \
and (not site.has_right('upload_by_url') or download):
try:
file_url = self.read_file_content(file_url)
except FatalServerError as e:
pywikibot.error(e)
return None
try:
success = imagepage.upload(file_url,
ignore_warnings=ignore_warnings,
chunk_size=self.chunk_size,
asynchronous=self.asynchronous,
comment=self.summary)
except APIError as error:
if error.code == 'uploaddisabled':
pywikibot.error(f'Upload error: Local file uploads are '
f'disabled on {site}.')
elif error.code == 'copyuploadbaddomain' and not download \
and '://' in file_url:
pywikibot.error(error)
pywikibot.info('Downloading the file and retry...')
download = True
continue
else:
pywikibot.exception('Upload error: ')
except Exception:
pywikibot.exception('Upload error: ')
else:
if success:
# No warning, upload complete.
pywikibot.info(f'Upload of {filename} successful.')
self.counter['upload'] += 1
return filename # data['filename']
pywikibot.info('Upload aborted.')
break
return None
[docs]
def skip_run(self) -> bool:
"""Check whether processing is to be skipped."""
# early check that upload is enabled
if self.target_site.is_uploaddisabled():
pywikibot.error(f'Upload error: Local file uploads are disabled '
f'on {self.target_site}.')
return True
# early check that user has proper rights to upload
self.target_site.login()
if not self.target_site.has_right('upload'):
pywikibot.error(f"User '{self.target_site.user()}' does not have "
f'upload rights on site {self.target_site}.')
return True
return False
[docs]
def run(self):
"""Run bot.
.. versionchanged:: 9.1
count uploads.
"""
if self.skip_run():
return
try:
for file_url in self.url:
filename = self.upload_file(file_url)
self.counter['read'] += 1
if filename:
self.counter['upload'] += 1
if callable(self.post_processor):
self.post_processor(file_url, filename)
except QuitKeyboardInterrupt:
pywikibot.info(f'\nUser quit {type(self).__name__} bot run...')
except KeyboardInterrupt:
if config.verbose_output:
raise
pywikibot.info(
f'\nKeyboardInterrupt during {type(self).__name__} bot run...')
finally:
self.exit()