from __future__ import annotations import logging import re import httpx from app.clients.base import BaseHTTPClient from app.config import Settings from app.models.sources import UptimeKumaMonitor, UptimeKumaSnapshot METRIC_LINE_RE = re.compile(r'^(?P[a-zA-Z_:][a-zA-Z0-9_:]*){(?P[^}]*)}s+(?P.+)$') LABEL_RE = re.compile(r'(w+)="((?:[^"\\]|\\.)*)"') logger = logging.getLogger(__name__) class UptimeKumaClient(BaseHTTPClient): """ Reads Uptime Kuma monitor status from the /metrics endpoint. Auth: once an API key exists in Uptime Kuma, username/password Basic Auth is permanently disabled. The correct format is HTTP Basic Auth with an empty username and the API key as the password: auth=("", api_key). """ def __init__(self, settings: Settings) -> None: super().__init__(settings, "uptime-kuma", settings.uptime_kuma_base_url) async def fetch_monitors(self) -> UptimeKumaSnapshot: snapshot = UptimeKumaSnapshot() if not self.base_url: logger.info("uptime kuma skipped: no base URL configured") return snapshot raw_metrics: str | None = None # Primary: API key as Basic Auth password (empty username) if self.settings.uptime_kuma_api_key: raw_metrics = await self._request_metrics_with_mode( "api-key", auth=("", self.settings.uptime_kuma_api_key), ) # Fallback: regular username/password (only works if no API keys exist) if raw_metrics is None and self.settings.uptime_kuma_username and self.settings.uptime_kuma_password: raw_metrics = await self._request_metrics_with_mode( "basic-user", auth=(self.settings.uptime_kuma_username, self.settings.uptime_kuma_password), ) if raw_metrics is None and not ( self.settings.uptime_kuma_api_key or (self.settings.uptime_kuma_username and self.settings.uptime_kuma_password) ): logger.info("uptime kuma skipped: no usable metrics auth configured") return snapshot if not raw_metrics: logger.warning("uptime kuma returned empty metrics payload or metrics auth failed") return snapshot logger.info("uptime kuma raw metrics first 40 lines: %s", raw_metrics.splitlines()[:40]) monitors = self._parse_metrics(raw_metrics) up = sum(1 for monitor in monitors if monitor.status == "online") down = sum(1 for monitor in monitors if monitor.status == "offline") paused = sum(1 for monitor in monitors if monitor.status == "degraded") normalized = UptimeKumaSnapshot( source_status="online", monitors_up=up, monitors_down=down, monitors_paused=paused, total=len(monitors), monitors=monitors, ) logger.info("uptime kuma normalized snapshot: %s", normalized.model_dump()) return normalized async def _request_metrics_with_mode( self, mode: str, *, auth: tuple[str, str] | None = None, ) -> str | None: if not self.base_url: return None url = f"{self.base_url}/metrics" try: async with httpx.AsyncClient( timeout=self.settings.request_timeout_seconds, trust_env=False, ) as client: response = await client.request("GET", url, auth=auth) if response.status_code == 200 and response.text: logger.info("uptime kuma metrics auth succeeded via %s", mode) return response.text if response.status_code in (401, 403): logger.warning( "uptime kuma metrics auth failed via %s with status %s", mode, response.status_code, ) return None except httpx.TimeoutException: logger.warning("uptime kuma metrics request timed out via %s", mode) except httpx.HTTPError as exc: logger.warning("uptime kuma metrics request error via %s: %s", mode, exc) return None def _parse_metrics(self, payload: str) -> list[UptimeKumaMonitor]: status_by_id: dict[str, UptimeKumaMonitor] = {} for line in payload.splitlines(): parsed = self._parse_metric_line(line) if parsed is None: continue metric_name, labels, raw_value = parsed monitor_id = labels.get("monitor_id") or labels.get("id") or labels.get("monitor") or labels.get("monitor_name") monitor_name = labels.get("monitor_name") or labels.get("name") if not monitor_id or not monitor_name: continue if monitor_id not in status_by_id: status_by_id[monitor_id] = UptimeKumaMonitor( id=self._as_int(monitor_id), name=monitor_name, ) monitor = status_by_id[monitor_id] if metric_name == "monitor_status": status_code = self._as_int_from_float(raw_value) if status_code == 1: monitor.status = "online" elif status_code == 3: monitor.status = "degraded" else: monitor.status = "offline" elif metric_name == "monitor_response_time": latency = self._as_float(raw_value) monitor.latency_ms = int(latency) if latency >= 0 else None elif metric_name == "monitor_uptime": duration = labels.get("duration", "") if duration == "24": uptime = self._as_float(raw_value) if 0.0 <= uptime <= 1.0: monitor.uptime_24h = round(uptime * 100, 1) # Build synthetic heartbeat bar (20 segments) from uptime_24h for monitor in status_by_id.values(): if not monitor.heartbeats: if monitor.uptime_24h >= 99.9: monitor.heartbeats = [1] * 20 elif monitor.uptime_24h <= 0.1: monitor.heartbeats = [0] * 20 else: green_count = round(monitor.uptime_24h / 100 * 20) monitor.heartbeats = [0] * (20 - green_count) + [1] * green_count return list(status_by_id.values()) @staticmethod def _parse_metric_line(line: str) -> tuple[str, dict[str, str], str] | None: if not line or line.startswith("#"): return None match = METRIC_LINE_RE.match(line.strip()) if not match: return None labels = { key: value.encode("utf-8").decode("unicode_escape") for key, value in LABEL_RE.findall(match.group("labels")) } return match.group("name"), labels, match.group("value") @staticmethod def _as_float(value: str) -> float: try: return float(value) except (ValueError, TypeError): return -1.0 @staticmethod def _as_int(value: str) -> int: try: return int(value) except (ValueError, TypeError): return 0 @staticmethod def _as_int_from_float(value: str) -> int: try: return int(float(value)) except (ValueError, TypeError): return 0