Files
homelab-infra/apps/dashboard/backend/app/clients/uptime_kuma_client.py
T

191 lines
7.4 KiB
Python

from __future__ import annotations
import logging
import re
import httpx
from app.clients.base import BaseHTTPClient
from app.config import Settings
from app.models.sources import UptimeKumaMonitor, UptimeKumaSnapshot
METRIC_LINE_RE = re.compile(r'^(?P<name>[a-zA-Z_:][a-zA-Z0-9_:]*){(?P<labels>[^}]*)}s+(?P<value>.+)$')
LABEL_RE = re.compile(r'(w+)="((?:[^"\\]|\\.)*)"')
logger = logging.getLogger(__name__)
class UptimeKumaClient(BaseHTTPClient):
"""
Reads Uptime Kuma monitor status from the /metrics endpoint.
Auth: once an API key exists in Uptime Kuma, username/password Basic Auth
is permanently disabled. The correct format is HTTP Basic Auth with an
empty username and the API key as the password: auth=("", api_key).
"""
def __init__(self, settings: Settings) -> None:
super().__init__(settings, "uptime-kuma", settings.uptime_kuma_base_url)
async def fetch_monitors(self) -> UptimeKumaSnapshot:
snapshot = UptimeKumaSnapshot()
if not self.base_url:
logger.info("uptime kuma skipped: no base URL configured")
return snapshot
raw_metrics: str | None = None
# Primary: API key as Basic Auth password (empty username)
if self.settings.uptime_kuma_api_key:
raw_metrics = await self._request_metrics_with_mode(
"api-key",
auth=("", self.settings.uptime_kuma_api_key),
)
# Fallback: regular username/password (only works if no API keys exist)
if raw_metrics is None and self.settings.uptime_kuma_username and self.settings.uptime_kuma_password:
raw_metrics = await self._request_metrics_with_mode(
"basic-user",
auth=(self.settings.uptime_kuma_username, self.settings.uptime_kuma_password),
)
if raw_metrics is None and not (
self.settings.uptime_kuma_api_key
or (self.settings.uptime_kuma_username and self.settings.uptime_kuma_password)
):
logger.info("uptime kuma skipped: no usable metrics auth configured")
return snapshot
if not raw_metrics:
logger.warning("uptime kuma returned empty metrics payload or metrics auth failed")
return snapshot
logger.info("uptime kuma raw metrics first 40 lines: %s", raw_metrics.splitlines()[:40])
monitors = self._parse_metrics(raw_metrics)
up = sum(1 for monitor in monitors if monitor.status == "online")
down = sum(1 for monitor in monitors if monitor.status == "offline")
paused = sum(1 for monitor in monitors if monitor.status == "degraded")
normalized = UptimeKumaSnapshot(
source_status="online",
monitors_up=up,
monitors_down=down,
monitors_paused=paused,
total=len(monitors),
monitors=monitors,
)
logger.info("uptime kuma normalized snapshot: %s", normalized.model_dump())
return normalized
async def _request_metrics_with_mode(
self,
mode: str,
*,
auth: tuple[str, str] | None = None,
) -> str | None:
if not self.base_url:
return None
url = f"{self.base_url}/metrics"
try:
async with httpx.AsyncClient(
timeout=self.settings.request_timeout_seconds,
trust_env=False,
) as client:
response = await client.request("GET", url, auth=auth)
if response.status_code == 200 and response.text:
logger.info("uptime kuma metrics auth succeeded via %s", mode)
return response.text
if response.status_code in (401, 403):
logger.warning(
"uptime kuma metrics auth failed via %s with status %s",
mode,
response.status_code,
)
return None
except httpx.TimeoutException:
logger.warning("uptime kuma metrics request timed out via %s", mode)
except httpx.HTTPError as exc:
logger.warning("uptime kuma metrics request error via %s: %s", mode, exc)
return None
def _parse_metrics(self, payload: str) -> list[UptimeKumaMonitor]:
status_by_id: dict[str, UptimeKumaMonitor] = {}
for line in payload.splitlines():
parsed = self._parse_metric_line(line)
if parsed is None:
continue
metric_name, labels, raw_value = parsed
monitor_id = labels.get("monitor_id") or labels.get("id") or labels.get("monitor") or labels.get("monitor_name")
monitor_name = labels.get("monitor_name") or labels.get("name")
if not monitor_id or not monitor_name:
continue
if monitor_id not in status_by_id:
status_by_id[monitor_id] = UptimeKumaMonitor(
id=self._as_int(monitor_id),
name=monitor_name,
)
monitor = status_by_id[monitor_id]
if metric_name == "monitor_status":
status_code = self._as_int_from_float(raw_value)
if status_code == 1:
monitor.status = "online"
elif status_code == 3:
monitor.status = "degraded"
else:
monitor.status = "offline"
elif metric_name == "monitor_response_time":
latency = self._as_float(raw_value)
monitor.latency_ms = int(latency) if latency >= 0 else None
elif metric_name == "monitor_uptime":
duration = labels.get("duration", "")
if duration == "24":
uptime = self._as_float(raw_value)
if 0.0 <= uptime <= 1.0:
monitor.uptime_24h = round(uptime * 100, 1)
# Build synthetic heartbeat bar (20 segments) from uptime_24h
for monitor in status_by_id.values():
if not monitor.heartbeats:
if monitor.uptime_24h >= 99.9:
monitor.heartbeats = [1] * 20
elif monitor.uptime_24h <= 0.1:
monitor.heartbeats = [0] * 20
else:
green_count = round(monitor.uptime_24h / 100 * 20)
monitor.heartbeats = [0] * (20 - green_count) + [1] * green_count
return list(status_by_id.values())
@staticmethod
def _parse_metric_line(line: str) -> tuple[str, dict[str, str], str] | None:
if not line or line.startswith("#"):
return None
match = METRIC_LINE_RE.match(line.strip())
if not match:
return None
labels = {
key: value.encode("utf-8").decode("unicode_escape")
for key, value in LABEL_RE.findall(match.group("labels"))
}
return match.group("name"), labels, match.group("value")
@staticmethod
def _as_float(value: str) -> float:
try:
return float(value)
except (ValueError, TypeError):
return -1.0
@staticmethod
def _as_int(value: str) -> int:
try:
return int(value)
except (ValueError, TypeError):
return 0
@staticmethod
def _as_int_from_float(value: str) -> int:
try:
return int(float(value))
except (ValueError, TypeError):
return 0