Source code for wmflib.prometheus

"""Prometheus module."""

import logging

from typing import Dict, List

import requests

from wmflib.constants import ALL_DATACENTERS
from wmflib.exceptions import WmflibError
from wmflib.requests import http_session, TimeoutType

logger = logging.getLogger(__name__)


[docs] class PrometheusError(WmflibError): """Custom exception class for errors of this module."""
[docs] class PrometheusBase: """Base class to interact with Prometheus-like APIs.""" def __init__(self) -> None: """Initialize the instance.""" self._http_session = http_session('.'.join((self.__module__, self.__class__.__name__))) def _query(self, url: str, params: Dict[str, str], timeout: TimeoutType) -> List[Dict]: """Perform a generic query. Arguments: url (str): the URL to query. params (dict): a dictionary of the GET parameters to pass to the URL. timeout (:py:data:`wmflib.requests.TimeoutType`): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{'metric': {}, 'value': [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error """ response = self._http_session.get(url, params=params, timeout=timeout) if response.status_code != requests.codes['ok']: raise PrometheusError(f'Unable to get metric: HTTP {response.status_code}: {response.text}') result = response.json() if result.get('status', 'error') == 'error': raise PrometheusError(f'Unable to get metric: {result.get("error", "unknown")}') return result['data']['result']
[docs] class Prometheus(PrometheusBase): """Class to interact with a Prometheus API instance. Examples: :: >>> from wmflib.prometheus import Prometheus >>> prometheus = Prometheus() """ _prometheus_api: str = 'http://prometheus.svc.{site}.wmnet/{instance}/api/v1/query'
[docs] def query(self, query: str, site: str, *, instance: str = 'ops', timeout: TimeoutType = 10.0) -> List[Dict]: """Perform a generic query. Examples: :: >>> results = prometheus.query('node_uname_info{instance=~"host1001:.*"}', 'eqiad', instance='global') >>> results = prometheus.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}', 'eqiad') The content of the last results will be something like:: [ { 'metric': { '__name__': 'node_memory_MemTotal_bytes', 'cluster': 'management', 'instance': 'host1001:9100', 'job': 'node', 'site': 'eqiad' }, 'value': [1636569623.988, '67225329664'] } ] Arguments: query (str): a prometheus query string. site (str): The site to use for queries. Must be one of :py:const:`wmflib.constants.ALL_DATACENTERS` instance (str, optional): The prometheus instance to query on the given site, see https://wikitech.wikimedia.org/wiki/Prometheus#Instances for the full list of available instances. timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{'metric': {}, 'value': [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error """ if site not in ALL_DATACENTERS: msg = f'site ({site}) must be one of wmflib.constants.ALL_DATACENTERS {ALL_DATACENTERS}' raise PrometheusError(msg) url = self._prometheus_api.format(site=site, instance=instance) params = {'query': query} return self._query(url, params, timeout)
[docs] class Thanos(PrometheusBase): """Class to interact with a Thanos API endpoint. Examples: :: >>> from wmflib.prometheus import Thanos >>> thanos = Thanos() """ _thanos_api: str = 'https://thanos-query.discovery.wmnet/api/v1/query'
[docs] def query(self, query: str, *, timeout: TimeoutType = 10.0) -> List[Dict]: """Perform a generic query. Examples: :: >>> results = thanos.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}') >>> results = thanos.query('node_uname_info{instance=~"host1001:.*"}') The content of the last results will be something like:: [ { 'metric': { '__name__': 'node_uname_info', 'cluster': 'management', 'domainname': '(none)', 'instance': 'host1001:9100', 'job': 'node', 'machine': 'x86_64', 'nodename': 'host1001', 'prometheus': 'ops', 'release': '5.10.0-11-amd64', 'site': 'eqiad', 'sysname': 'Linux', 'version': '#1 SMP Debian 5.10.92-2 (2022-02-28)' }, 'value': [1648898872.82, '1'] } ] Arguments: query (str): a prometheus query string. timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{'metric': {}, 'value': [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error. """ params = {'dedup': 'true', 'partial_response': 'false', 'query': query} return self._query(self._thanos_api, params, timeout)