Source code for pywikibot.specialbots._upload

"""Special bot library containing UploadRobot.

Do not import classes directly from here but from specialbots.
# (C) Pywikibot team, 2003-2024
# Distributed under the terms of the MIT license.
from __future__ import annotations

import os
import tempfile
from contextlib import suppress
from http import HTTPStatus
from pathlib import Path
from urllib.parse import urlparse

import requests

import pywikibot
import pywikibot.comms.http as http
from pywikibot import config
from pywikibot.backports import Callable
from import BaseBot, QuitKeyboardInterrupt
from pywikibot.exceptions import APIError, FatalServerError, NoPageError

[docs] class UploadRobot(BaseBot): """Upload bot.""" post_processor: Callable[[str, str | None], None] | None = None """If this attribute is set to a callable, the :meth:`run` method calls it after upload. The parameters passed to the callable is the origin *file_url* passed to the :meth:`upload` method and the *filename* returned from that method. It can be used like this: .. code-block:: python def summarize(old: str, new: str | None) -> None: if new is None: print(f'{old} was ignored') else: print(f'{old} was uploaded as {new}') bot = UploadRobot('Myfile.bmp') bot.post_processor = summarize .. versionadded:: 9.1 """ def __init__(self, url: list[str] | str, *, url_encoding=None, description: str = '', use_filename=None, keep_filename: bool = False, verify_description: bool = True, ignore_warning: bool | list = False, target_site=None, aborts: bool | list | None = None, chunk_size: int = 0, asynchronous: bool = False, summary: str | None = None, filename_prefix: str | None = None, force_if_shared: bool = False, **kwargs) -> None: """Initializer. .. versionchanged:: 6.2 asynchronous upload is used if *asynchronous* parameter is set .. versionchanged:: 6.4 *force_if_shared* parameter was added :param url: path to url or local file, or list of urls or paths to local files. :param description: Description of file for its page. If multiple files are uploading the same description is used for every file. :param use_filename: Specify title of the file's page. If multiple files are uploading it asks to change the name for second, third, etc. files, otherwise the last file will overwrite the other. :param keep_filename: Set to True to keep original names of urls and files, otherwise it will ask to enter a name for each file. :param summary: Summary of the upload :param verify_description: Set to True to proofread the description. :param ignore_warning: Set this to True to upload even if another file would be overwritten or another mistake would be risked. Set it to an array of warning codes to selectively ignore specific warnings. :param target_site: Set the site to upload to. If target site is not given it's taken from user config file ( :type target_site: object :param aborts: List of the warning types to abort upload on. Set to True to abort on any warning. :param chunk_size: Upload the file in chunks (more overhead, but restartable) specified in bytes. If no value is specified the file will be uploaded as whole. :param asynchronous: Make potentially large file operations asynchronous on the server side when possible. :param filename_prefix: Specify prefix for the title of every file's page. :param force_if_shared: Upload the file even if it's currently shared to the target site (e.g. when moving from Commons to another wiki) :keyword bool always: Disables any input, requires that either ignore_warning or aborts are set to True and that the description is also set. It overwrites verify_description to False and keep_filename to True. """ super().__init__(**kwargs) if self.opt.always: if ignore_warning is not True and aborts is not True: raise ValueError( 'When always is set to True, ' 'ignore_warning or aborts must be set to True.') if not description: raise ValueError( 'When always is set to True, the description must be set.') self.url = [url] if isinstance(url, str) else url self.url_encoding = url_encoding self.description = description self.use_filename = use_filename self.keep_filename = keep_filename or self.opt.always self.verify_description = verify_description and not self.opt.always self.ignore_warning = ignore_warning self.aborts = aborts or [] self.chunk_size = chunk_size self.asynchronous = asynchronous self.summary = summary self.filename_prefix = filename_prefix self.force_if_shared = force_if_shared if config.upload_to_commons: default_site = pywikibot.Site('commons') else: default_site = pywikibot.Site() self.target_site = target_site or default_site
[docs] def read_file_content(self, file_url: str): """Return name of temp file in which remote file is saved."""'Reading file ' + file_url) handle, tempname = tempfile.mkstemp() path = Path(tempname) size = 0 dt_gen = (el for el in (15, 30, 45, 60, 120, 180, 240, 300)) while True: file_len = path.stat().st_size if file_len:'Download resumed.') headers = {'Range': f'bytes={file_len}-'} else: headers = {} with open(path, 'ab') as fd: os.lseek(handle, file_len, 0) try: response = http.fetch(file_url, stream=True, headers=headers) response.raise_for_status() # get download info, if available # Note: this is not enough to exclude pages # e.g. 'application/json' is also not a media if 'text/' in response.headers['Content-Type']: raise FatalServerError('The requested URL was not ' 'found on server.') size = max(size, int(response.headers.get('Content-Length', 0))) # stream content to temp file (in chunks of 1Mb) for chunk in response.iter_content(chunk_size=1024 * 1024): fd.write(chunk) # raised from connection lost during response.iter_content() except requests.ConnectionError: fd.flush() f'Connection closed at byte {path.stat().st_size}') # raised from response.raise_for_status() except requests.HTTPError as e: # exit criteria if size is not available # error on last iteration is OK, we're requesting # {'Range': 'bytes=file_len-'} err = HTTPStatus.REQUESTED_RANGE_NOT_SATISFIABLE if response.status_code == err and path.stat().st_size: break raise FatalServerError(str(e)) from e if size and size == path.stat().st_size: break try: dt = next(dt_gen)'Sleeping for {dt} seconds ...') pywikibot.sleep(dt) except StopIteration: raise FatalServerError('Download failed, too many retries!')'Downloaded {path.stat().st_size} bytes') return tempname
def _handle_warning(self, warning: str) -> bool | None: """Return whether the warning cause an abort or be ignored. :param warning: The warning name :return: False if this warning should cause an abort, True if it should be ignored or None if this warning has no default handler. """ if self.aborts is not True and warning in self.aborts: return False if self.ignore_warning is True or (self.ignore_warning is not False and warning in self.ignore_warning): return True return None if self.aborts is not True else False def _handle_warnings(self, warnings): messages = '\n'.join('{0.code}: {}'.format(warning) for warning in sorted(warnings, key=lambda w: w.code)) if len(warnings) > 1: messages = '\n' + messages'We got the following warning(s): ' + messages) answer = True for warning in warnings: this_answer = self._handle_warning(warning.code) if this_answer is False: answer = False break if this_answer is None: answer = None if answer is None: answer = pywikibot.input_yn('Do you want to ignore?', default=False, automatic_quit=False) return answer
[docs] def process_filename(self, file_url: str) -> str | None: """Return base filename portion of *file_url*. :param file_url: either a URL or a local file path """ # Isolate the pure name filename = file_url if '://' in filename: # extract the path portion of the URL filename = urlparse(filename).path filename = os.path.basename(filename) if self.use_filename: filename = self.use_filename if self.filename_prefix: filename = self.filename_prefix + filename if not self.keep_filename:'\nThe filename on the target wiki will default ' f'to: {filename}\n') assert not self.opt.always newfn = pywikibot.input( 'Enter a better name, or press enter to accept:') if newfn != '': filename = newfn # FIXME: these 2 belong somewhere else, presumably in family # forbidden characters are handled by pywikibot/ forbidden = ':*/\\' # to be extended allowed_formats = self.target_site.file_extensions # ask until it's valid first_check = True while True: if not first_check: if self.opt.always: filename = None else: filename = pywikibot.input('Enter a better name, or press ' 'enter to skip the file:') if not filename: return None first_check = False ext = os.path.splitext(filename)[1].lower().strip('.') # are any chars in forbidden also in filename? invalid = set(forbidden) & set(filename) if invalid: c = ''.join(invalid) f'Invalid character(s): {c}. Please try again') continue if allowed_formats and ext not in allowed_formats: if self.opt.always:'File format is not one of [{}]' .format(' '.join(allowed_formats))) elif pywikibot.input_yn( 'File format is not one of [{}], but {!r}. Skip?' .format(' '.join(allowed_formats), ext)): return None continue potential_file_page = pywikibot.FilePage(self.target_site, filename) if potential_file_page.exists(): overwrite = self._handle_warning('exists') if overwrite is False: 'File exists and you asked to abort. Skipping.') return None if potential_file_page.has_permission(): if overwrite is None: overwrite = not pywikibot.input_yn( f'File with name {filename} already exists.' ' Would you like to change the name?' ' (Otherwise file will be overwritten.)', default=True, automatic_quit=False) if not overwrite: continue break'File with name {filename} already exists and ' f'cannot be overwritten.') continue with suppress(NoPageError): if (not self.force_if_shared and potential_file_page.file_is_shared()): f'File with name {filename} already exists in shared ' f'repository and cannot be overwritten.') continue break # A proper description for the submission. # Empty descriptions are not accepted. if self.description: f'The suggested description is:\n{self.description}') while not self.description or self.verify_description: if not self.description:'<<lightred>>It is not possible to upload a ' 'file without a description.') assert not self.opt.always # if no description, ask if user want to add one or quit, # and loop until one is filled. # if self.verify_description, ask if user want to change it # or continue. if self.description: question = 'Do you want to change this description?' else: question = 'No description was given. Add one?' if pywikibot.input_yn(question, default=not self.description, automatic_quit=self.description): from pywikibot import editor as editarticle editor = editarticle.TextEditor() try: new_description = editor.edit(self.description) except ImportError: raise except Exception as e: pywikibot.error(e) continue # if user saved / didn't press Cancel if new_description: self.description = new_description elif not self.description: raise QuitKeyboardInterrupt self.verify_description = False return filename
[docs] def abort_on_warn(self, warn_code): """Determine if the warning message should cause an abort.""" return self.aborts is True or warn_code in self.aborts
[docs] def ignore_on_warn(self, warn_code: str): """ Determine if the warning message should be ignored. :param warn_code: The warning message """ return self.ignore_warning is True or warn_code in self.ignore_warning
[docs] def upload_file(self, file_url: str) -> str | None: """ Upload the image at file_url to the target wiki. .. seealso:: :api:`Upload` Return the filename that was used to upload the image. If the upload fails, ask the user whether to try again or not. If the user chooses not to retry, return None. .. versionchanged:: 7.0 If 'copyuploadbaddomain' API error occurred in first step, download the file and upload it afterwards :param file_url: either a URL or a local file path """ filename = self.process_filename(file_url) if not filename: return None site = self.target_site imagepage = pywikibot.FilePage(site, filename) # normalizes filename imagepage.text = self.description'Uploading file to {site}...') ignore_warnings = self.ignore_warning is True or self._handle_warnings download = False while True: if '://' in file_url \ and (not site.has_right('upload_by_url') or download): try: file_url = self.read_file_content(file_url) except FatalServerError as e: pywikibot.error(e) return None try: success = imagepage.upload(file_url, ignore_warnings=ignore_warnings, chunk_size=self.chunk_size, asynchronous=self.asynchronous, comment=self.summary) except APIError as error: if error.code == 'uploaddisabled': pywikibot.error(f'Upload error: Local file uploads are ' f'disabled on {site}.') elif error.code == 'copyuploadbaddomain' and not download \ and '://' in file_url: pywikibot.error(error)'Downloading the file and retry...') download = True continue else: pywikibot.exception('Upload error: ') except Exception: pywikibot.exception('Upload error: ') else: if success: # No warning, upload complete.'Upload of {filename} successful.') self.counter['upload'] += 1 return filename # data['filename']'Upload aborted.') break return None
[docs] def skip_run(self) -> bool: """Check whether processing is to be skipped.""" # early check that upload is enabled if self.target_site.is_uploaddisabled(): pywikibot.error(f'Upload error: Local file uploads are disabled ' f'on {self.target_site}.') return True # early check that user has proper rights to upload self.target_site.login() if not self.target_site.has_right('upload'): pywikibot.error(f"User '{self.target_site.user()}' does not have " f'upload rights on site {self.target_site}.') return True return False
[docs] def run(self): """Run bot. .. versionchanged:: 9.1 count uploads. """ if self.skip_run(): return try: for file_url in self.url: filename = self.upload_file(file_url) self.counter['read'] += 1 if filename: self.counter['upload'] += 1 if callable(self.post_processor): self.post_processor(file_url, filename) except QuitKeyboardInterrupt:'\nUser quit {type(self).__name__} bot run...') except KeyboardInterrupt: if config.verbose_output: raise f'\nKeyboardInterrupt during {type(self).__name__} bot run...') finally: self.exit()