Source code for wmflib.prometheus

"""Prometheus module."""

import logging

import requests

from wmflib.constants import ALL_DATACENTERS
from wmflib.exceptions import WmflibError
from wmflib.requests import TimeoutType, http_session

logger = logging.getLogger(__name__)


[docs] class PrometheusError(WmflibError): """Custom exception class for errors of this module."""
[docs] class PrometheusBase: """Base class to interact with Prometheus-like APIs."""
[docs] def __init__(self) -> None: """Initialize the instance.""" self._http_session = http_session(".".join((self.__module__, self.__class__.__name__)))
[docs] def _query(self, url: str, params: dict[str, str], timeout: TimeoutType) -> list[dict]: """Perform a generic query. Arguments: url (str): the URL to query. params (dict): a dictionary of the GET parameters to pass to the URL. timeout (:py:data:`wmflib.requests.TimeoutType`): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{"metric": {}, "value": [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error """ response = self._http_session.get(url, params=params, timeout=timeout) if response.status_code != requests.codes["ok"]: raise PrometheusError(f"Unable to get metric: HTTP {response.status_code}: {response.text}") result = response.json() if result.get("status", "error") == "error": raise PrometheusError(f"Unable to get metric: {result.get('error', 'unknown')}") return result["data"]["result"]
[docs] class Prometheus(PrometheusBase): """Class to interact with a Prometheus API instance. Examples: :: >>> from wmflib.prometheus import Prometheus >>> prometheus = Prometheus() """ _prometheus_api: str = "http://prometheus.svc.{site}.wmnet/{instance}/api/v1/query"
[docs] def query(self, query: str, site: str, *, instance: str = "ops", timeout: TimeoutType = 10.0) -> list[dict]: """Perform a generic query. Examples: :: >>> results = prometheus.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}', "eqiad") >>> results = prometheus.query( ... 'kube_deployment_created{deployment="mw-web.eqiad.main"}', "eqiad", instance="k8s") The content of the first results will be something like:: [ { "metric": { "__name__": "node_memory_MemTotal_bytes", "cluster": "management", "instance": "host1001:9100", "job": "node", "site": "eqiad", }, "value": [1636569623.988, "67225329664"], } ] Arguments: query (str): a prometheus query string. site (str): The site to use for queries. Must be one of :py:const:`wmflib.constants.ALL_DATACENTERS` instance (str, optional): The prometheus instance to query on the given site, see https://wikitech.wikimedia.org/wiki/Prometheus#Instances for the full list of available instances. timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{"metric": {}, "value": [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error """ if site not in ALL_DATACENTERS: msg = f"site ({site}) must be one of wmflib.constants.ALL_DATACENTERS {ALL_DATACENTERS}" raise PrometheusError(msg) url = self._prometheus_api.format(site=site, instance=instance) params = {"query": query} return self._query(url, params, timeout)
[docs] class Thanos(PrometheusBase): """Class to interact with a Thanos API endpoint. Examples: :: >>> from wmflib.prometheus import Thanos >>> thanos = Thanos() """ _thanos_api: str = "https://thanos-query.discovery.wmnet/api/v1/query"
[docs] def query(self, query: str, *, timeout: TimeoutType = 10.0) -> list[dict]: """Perform a generic query. Examples: :: >>> results = thanos.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}') >>> results = thanos.query('node_uname_info{instance=~"host1001:.*"}') The content of the last results will be something like:: [ { "metric": { "__name__": "node_uname_info", "cluster": "management", "domainname": "(none)", "instance": "host1001:9100", "job": "node", "machine": "x86_64", "nodename": "host1001", "prometheus": "ops", "release": "5.10.0-11-amd64", "site": "eqiad", "sysname": "Linux", "version": "#1 SMP Debian 5.10.92-2 (2022-02-28)", }, "value": [1648898872.82, "1"], } ] Arguments: query (str): a prometheus query string. timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to reply before giving up. This is passed directly to the requests library. Returns: list: returns an empty list if there are no results otherwise return a list of results of the form: ``{"metric": {}, "value": [$timestamp, $value]}``. Raises: wmflib.prometheus.PrometheusError: on error. """ params = {"dedup": "true", "partial_response": "false", "query": query} return self._query(self._thanos_api, params, timeout)