"""Prometheus module."""
import logging
import requests
from wmflib.constants import ALL_DATACENTERS
from wmflib.exceptions import WmflibError
from wmflib.requests import TimeoutType, http_session
logger = logging.getLogger(__name__)
[docs]
class PrometheusError(WmflibError):
"""Custom exception class for errors of this module."""
[docs]
class PrometheusBase:
"""Base class to interact with Prometheus-like APIs."""
[docs]
def __init__(self) -> None:
"""Initialize the instance."""
self._http_session = http_session(".".join((self.__module__, self.__class__.__name__)))
[docs]
def _query(self, url: str, params: dict[str, str], timeout: TimeoutType) -> list[dict]:
"""Perform a generic query.
Arguments:
url (str): the URL to query.
params (dict): a dictionary of the GET parameters to pass to the URL.
timeout (:py:data:`wmflib.requests.TimeoutType`): How many seconds to wait for prometheus to reply before
giving up. This is passed directly to the requests library.
Returns:
list: returns an empty list if there are no results otherwise return a list of results of the form:
``{"metric": {}, "value": [$timestamp, $value]}``.
Raises:
wmflib.prometheus.PrometheusError: on error
"""
response = self._http_session.get(url, params=params, timeout=timeout)
if response.status_code != requests.codes["ok"]:
raise PrometheusError(f"Unable to get metric: HTTP {response.status_code}: {response.text}")
result = response.json()
if result.get("status", "error") == "error":
raise PrometheusError(f"Unable to get metric: {result.get('error', 'unknown')}")
return result["data"]["result"]
[docs]
class Prometheus(PrometheusBase):
"""Class to interact with a Prometheus API instance.
Examples:
::
>>> from wmflib.prometheus import Prometheus
>>> prometheus = Prometheus()
"""
_prometheus_api: str = "http://prometheus.svc.{site}.wmnet/{instance}/api/v1/query"
[docs]
def query(self, query: str, site: str, *, instance: str = "ops", timeout: TimeoutType = 10.0) -> list[dict]:
"""Perform a generic query.
Examples:
::
>>> results = prometheus.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}', "eqiad")
>>> results = prometheus.query(
... 'kube_deployment_created{deployment="mw-web.eqiad.main"}', "eqiad", instance="k8s")
The content of the first results will be something like::
[
{
"metric": {
"__name__": "node_memory_MemTotal_bytes",
"cluster": "management",
"instance": "host1001:9100",
"job": "node",
"site": "eqiad",
},
"value": [1636569623.988, "67225329664"],
}
]
Arguments:
query (str): a prometheus query string.
site (str): The site to use for queries. Must be one of
:py:const:`wmflib.constants.ALL_DATACENTERS`
instance (str, optional): The prometheus instance to query on the given site, see
https://wikitech.wikimedia.org/wiki/Prometheus#Instances for the full list of available instances.
timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to
reply before giving up. This is passed directly to the requests library.
Returns:
list: returns an empty list if there are no results otherwise return a list of results of the form:
``{"metric": {}, "value": [$timestamp, $value]}``.
Raises:
wmflib.prometheus.PrometheusError: on error
"""
if site not in ALL_DATACENTERS:
msg = f"site ({site}) must be one of wmflib.constants.ALL_DATACENTERS {ALL_DATACENTERS}"
raise PrometheusError(msg)
url = self._prometheus_api.format(site=site, instance=instance)
params = {"query": query}
return self._query(url, params, timeout)
[docs]
class Thanos(PrometheusBase):
"""Class to interact with a Thanos API endpoint.
Examples:
::
>>> from wmflib.prometheus import Thanos
>>> thanos = Thanos()
"""
_thanos_api: str = "https://thanos-query.discovery.wmnet/api/v1/query"
[docs]
def query(self, query: str, *, timeout: TimeoutType = 10.0) -> list[dict]:
"""Perform a generic query.
Examples:
::
>>> results = thanos.query('node_memory_MemTotal_bytes{instance=~"host1001:.*"}')
>>> results = thanos.query('node_uname_info{instance=~"host1001:.*"}')
The content of the last results will be something like::
[
{
"metric": {
"__name__": "node_uname_info",
"cluster": "management",
"domainname": "(none)",
"instance": "host1001:9100",
"job": "node",
"machine": "x86_64",
"nodename": "host1001",
"prometheus": "ops",
"release": "5.10.0-11-amd64",
"site": "eqiad",
"sysname": "Linux",
"version": "#1 SMP Debian 5.10.92-2 (2022-02-28)",
},
"value": [1648898872.82, "1"],
}
]
Arguments:
query (str): a prometheus query string.
timeout (:py:data:`wmflib.requests.TimeoutType`, optional): How many seconds to wait for prometheus to
reply before giving up. This is passed directly to the requests library.
Returns:
list: returns an empty list if there are no results otherwise return a list of results of the form:
``{"metric": {}, "value": [$timestamp, $value]}``.
Raises:
wmflib.prometheus.PrometheusError: on error.
"""
params = {"dedup": "true", "partial_response": "false", "query": query}
return self._query(self._thanos_api, params, timeout)