"""Superset Query interface... versionadded:: 9.2"""## (C) Pywikibot team, 2024## Distributed under the terms of the MIT license.#from__future__importannotationsfromhttpimportHTTPStatusfromtextwrapimportfillfromtypingimportTYPE_CHECKING,Anyimportpywikibotfrompywikibot.commsimporthttpfrompywikibot.dataimportWaitingMixinfrompywikibot.exceptionsimportNoUsernameError,ServerErrorifTYPE_CHECKING:frompywikibot.siteimportBaseSite
[docs]classSupersetQuery(WaitingMixin):"""Superset Query class. This class allows to run SQL queries against wikimedia superset service. """def__init__(self,schema_name:str|None=None,site:BaseSite|None=None,database_id:int|None=None)->None:"""Create superset endpoint with initial defaults. Either site OR schema_name is required. Site and schema_name are mutually exclusive. Database id will be retrieved automatically if needed. :param site: The mediawiki site to be queried :param schema_name: superset database schema name. Example value "enwiki_p" :param database_id: superset database id. :raises TypeError: if site and schema_name are both defined' """ifsiteandschema_name:msg='Only one of schema_name and site parameters can be defined'raiseTypeError(msg)# Validate database_idifdatabase_idandnotisinstance(database_id,int):msg=f'database_id should be integer, but got "{database_id}"'raiseTypeError(msg)self.site=siteself.schema_name=schema_nameself.database_id=database_idself.connected=Falseself.last_response=Noneself.superset_url='https://superset.wmcloud.org'
[docs]deflogin(self)->bool:"""Login to superset. Function logins first to meta.wikimedia.org and then OAUTH login to superset.wmcloud.org. Working login expects that the user has manually permitted the username to login to the superset. :raises NoUsernameError: if not not logged in. :raises ServerError: For other errors :return: True if user has been logged to superset """# superset uses meta for OAUTH authenticationloginsite=pywikibot.Site('meta')ifnotloginsite.logged_in():loginsite.login()ifnotloginsite.logged_in():msg='User is not logged in on meta.wikimedia.org'raiseNoUsernameError(msg)# Superset oauth loginurl=f'{self.superset_url}/login/mediawiki?next='self.last_response=http.fetch(url)# Test if uset has been successfully logged inurl=f'{self.superset_url}/api/v1/me/'self.last_response=http.fetch(url)# Handle error casesifself.last_response.status_code==HTTPStatus.OK:self.connected=Trueelifself.last_response.status_code==HTTPStatus.UNAUTHORIZED:self.connected=FalseraiseNoUsernameError(fill('User not logged in. You need to log in to ''meta.wikimedia.org and give OAUTH permission. ''Open https://superset.wmcloud.org/login/ ''with browser to login and give permission.'))else:self.connected=Falsestatus_code=self.last_response.status_coderaiseServerError(f'Unknown error: {status_code}')returnself.connected
[docs]defget_csrf_token(self)->str:"""Get superset CSRF token. Method retrieves a CSRF token from the Superset service. If the instance is not connected, it attempts to log in first. :raises ServerError: For any http errors :return: CSRF token string """ifnotself.connected:self.login()# Load CSRF tokenurl=f'{self.superset_url}/api/v1/security/csrf_token/'self.last_response=http.fetch(url)ifself.last_response.status_code==HTTPStatus.OK:returnself.last_response.json()['result']status_code=self.last_response.status_coderaiseServerError(f'CSRF token error: {status_code}')
[docs]defget_database_id_by_schema_name(self,schema_name:str)->int:"""Get superset database_id using superset schema name. :param schema_name: superset database schema name. Example value "enwiki_p" :raises KeyError: If the database ID could found. :raises ServerError: For any other http errors :return: database id """ifnotself.connected:self.login()fordatabase_idinrange(1,20):url=self.superset_urlurl+=f'/api/v1/database/{database_id}/schemas/?q=(force:!f)'self.last_response=http.fetch(url)ifself.last_response.status_code==HTTPStatus.OK:schemas=self.last_response.json()['result']ifschema_nameinschemas:returndatabase_idelifself.last_response.status_code==HTTPStatus.NOT_FOUND:breakelse:status_code=self.last_response.status_coderaiseServerError(f'Unknown error: {status_code}')url=self.superset_urlraiseKeyError(f'Schema "{schema_name}" not found in {url}.')
[docs]defmerge_query_arguments(self,database_id:int|None=None,schema_name:str|None=None,site:BaseSite=None)->tuple(int,str):"""Determine and validate the database_id and schema_name. :param database_id: The superset database ID. :param schema_name: The superset schema name. :param site: The target site :raises TypeError: if site and schema_name are both defined' :raises TypeError: If determined database_id is not an integer. :raises TypeError: If neither site nor schema_name is determined. :return: A tuple containing database_id and schema_name. """ifsiteandschema_name:msg='Only one of schema_name and site parameters can be defined'raiseTypeError(msg)# Determine schema_nameifnotschema_name:ifsite:schema_name=f'{site.dbName()}_p'elifself.schema_name:schema_name=self.schema_nameelifself.site:schema_name=f'{self.site.dbName()}_p'# Determine database_idifnotdatabase_id:ifself.database_id:database_id=int(self.database_id)else:database_id=self.get_database_id_by_schema_name(schema_name)# Validate database_idifnotisinstance(database_id,int):msg=f'database_id should be integer, but got "{database_id}"'raiseTypeError(msg)# Ensure either site or schema_name is providedifnot(self.siteorschema_name):raiseTypeError('Either site or schema_name must be provided')returndatabase_id,schema_name
[docs]defquery(self,sql:str,database_id:int|None=None,schema_name:str|None=None,site:BaseSite=None)->list[Any]:"""Execute SQL queries on Superset. :param sql: The SQL query to execute. :param database_id: The database ID. :param schema_name: The schema name. :raises RuntimeError: If the query execution fails. :return: The data returned from the query execution. """ifnotself.connected:self.login()token=self.get_csrf_token()headers={'X-CSRFToken':token,'Content-Type':'application/json','referer':'https://superset.wmcloud.org/sqllab/'}database_id,schema_name=self.merge_query_arguments(database_id,schema_name,site)sql_query_payload={'database_id':database_id,'schema':schema_name,'sql':sql,'json':True,'runAsync':False,}url=f'{self.superset_url}/api/v1/sqllab/execute/'try:self.last_response=http.fetch(uri=url,json=sql_query_payload,method='POST',headers=headers)self.last_response.raise_for_status()json=self.last_response.json()returnjson['data']exceptExceptionase:raiseRuntimeError(f'Failed to execute query: {e}')