from __future__ import annotations import logging import re import httpx from app.clients.base import BaseHTTPClient from app.config import Settings from app.models.sources import UptimeKumaMonitor, UptimeKumaSnapshot METRIC_LINE_RE = re.compile(r'^(?P[a-zA-Z_:][a-zA-Z0-9_:]*)\{(?P[^}]*)\}\s+(?P.+)$') LABEL_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') logger = logging.getLogger(__name__) class UptimeKumaClient(BaseHTTPClient): """ Reads Uptime Kuma monitor status from the documented /metrics endpoint. This avoids coupling the backend to Socket.IO login flows, but still relies on Kuma's internal metrics surface, which may change across releases. """ def __init__(self, settings: Settings) -> None: super().__init__(settings, "uptime-kuma", settings.uptime_kuma_base_url) async def fetch_monitors(self) -> UptimeKumaSnapshot: snapshot = UptimeKumaSnapshot() if not self.base_url: logger.info("uptime kuma skipped: base URL missing") return snapshot raw_metrics = None if self.settings.uptime_kuma_api_key: raw_metrics = await self._request_metrics_with_mode( "basic-api-key", auth=("", self.settings.uptime_kuma_api_key), ) if ( not raw_metrics and self.settings.uptime_kuma_username and self.settings.uptime_kuma_password ): raw_metrics = await self._request_metrics_with_mode( "basic-user", auth=(self.settings.uptime_kuma_username, self.settings.uptime_kuma_password), ) if raw_metrics is None and not ( self.settings.uptime_kuma_api_key or (self.settings.uptime_kuma_username and self.settings.uptime_kuma_password) ): logger.info("uptime kuma skipped: no usable metrics auth configured") return snapshot if not raw_metrics: logger.warning("uptime kuma returned empty metrics payload or metrics auth failed") return snapshot logger.info("uptime kuma raw metrics first 40 lines: %s", raw_metrics.splitlines()[:40]) monitors = self._parse_metrics(raw_metrics) up = sum(1 for monitor in monitors if monitor.status == "online") down = sum(1 for monitor in monitors if monitor.status == "offline") paused = sum(1 for monitor in monitors if monitor.status == "degraded") normalized = UptimeKumaSnapshot( source_status="online", monitors_up=up, monitors_down=down, monitors_paused=paused, total=len(monitors), monitors=sorted(monitors, key=lambda monitor: monitor.name.lower()), ) logger.info("uptime kuma normalized snapshot: %s", normalized.model_dump()) return normalized async def _request_metrics_with_mode( self, mode: str, *, headers: dict[str, str] | None = None, auth: tuple[str, str] | None = None, ) -> str | None: if not self.base_url: return None url = f"{self.base_url}/metrics" try: async with httpx.AsyncClient( timeout=self.settings.request_timeout_seconds, trust_env=False, ) as client: response = await client.request( "GET", url, headers=headers, auth=auth, ) if response.status_code == 200 and response.text: logger.info("uptime kuma metrics auth succeeded via %s", mode) return response.text if response.status_code in {401, 403}: logger.warning("uptime kuma auth failed (401/403)") else: logger.info( "uptime kuma metrics auth failed via %s with status %s", mode, response.status_code, ) return None except httpx.TimeoutException: logger.warning("uptime kuma metrics request timed out via %s", mode) except httpx.HTTPError as exc: logger.warning("uptime kuma metrics request error via %s: %s", mode, exc) return None def _parse_metrics(self, payload: str) -> list[UptimeKumaMonitor]: status_by_id: dict[str, UptimeKumaMonitor] = {} for line in payload.splitlines(): parsed = self._parse_metric_line(line) if parsed is None: continue metric_name, labels, raw_value = parsed monitor_id = labels.get("monitor_id") or labels.get("id") or labels.get("monitor") or labels.get("monitor_name") monitor_name = labels.get("monitor_name") or labels.get("name") if not monitor_id or not monitor_name: continue monitor = status_by_id.setdefault( monitor_id, UptimeKumaMonitor( id=monitor_id, name=monitor_name, status="offline", monitor_type=labels.get("monitor_type") or labels.get("type"), ), ) if metric_name == "monitor_status": status_code = self._as_float(raw_value) if status_code == 1: monitor.status = "online" elif status_code == 3: monitor.status = "degraded" else: monitor.status = "offline" elif metric_name == "monitor_response_time": latency = self._as_float(raw_value) monitor.latency_ms = int(latency) if latency >= 0 else None return list(status_by_id.values()) @staticmethod def _parse_metric_line(line: str) -> tuple[str, dict[str, str], str] | None: if not line or line.startswith("#"): return None match = METRIC_LINE_RE.match(line.strip()) if not match: return None labels = { key: value.encode("utf-8").decode("unicode_escape") for key, value in LABEL_RE.findall(match.group("labels")) } return match.group("name"), labels, match.group("value") @staticmethod def _as_float(value: str) -> float: try: return float(value) except ValueError: return -1.0