Add custom homelab dashboard stack

This commit is contained in:
2026-04-05 13:43:03 +02:00
parent 450a04a7d3
commit 89b9173c25
38 changed files with 3539 additions and 0 deletions
@@ -0,0 +1,178 @@
from __future__ import annotations
import logging
import re
import httpx
from app.clients.base import BaseHTTPClient
from app.config import Settings
from app.models.sources import UptimeKumaMonitor, UptimeKumaSnapshot
METRIC_LINE_RE = re.compile(r'^(?P<name>[a-zA-Z_:][a-zA-Z0-9_:]*)\{(?P<labels>[^}]*)\}\s+(?P<value>.+)$')
LABEL_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
logger = logging.getLogger(__name__)
class UptimeKumaClient(BaseHTTPClient):
"""
Reads Uptime Kuma monitor status from the documented /metrics endpoint.
This avoids coupling the backend to Socket.IO login flows, but still relies
on Kuma's internal metrics surface, which may change across releases.
"""
def __init__(self, settings: Settings) -> None:
super().__init__(settings, "uptime-kuma", settings.uptime_kuma_base_url)
async def fetch_monitors(self) -> UptimeKumaSnapshot:
snapshot = UptimeKumaSnapshot()
if not self.base_url:
logger.info("uptime kuma skipped: base URL missing")
return snapshot
raw_metrics = None
if self.settings.uptime_kuma_api_key:
raw_metrics = await self._request_metrics_with_mode(
"basic-api-key",
auth=(self.settings.uptime_kuma_api_key, ""),
)
if (
not raw_metrics
and self.settings.uptime_kuma_username
and self.settings.uptime_kuma_password
):
raw_metrics = await self._request_metrics_with_mode(
"basic-user",
auth=(self.settings.uptime_kuma_username, self.settings.uptime_kuma_password),
)
if raw_metrics is None and not (
self.settings.uptime_kuma_api_key
or (self.settings.uptime_kuma_username and self.settings.uptime_kuma_password)
):
logger.info("uptime kuma skipped: no usable metrics auth configured")
return snapshot
if not raw_metrics:
logger.warning("uptime kuma returned empty metrics payload or metrics auth failed")
return snapshot
logger.info("uptime kuma raw metrics first 40 lines: %s", raw_metrics.splitlines()[:40])
monitors = self._parse_metrics(raw_metrics)
up = sum(1 for monitor in monitors if monitor.status == "online")
down = sum(1 for monitor in monitors if monitor.status == "offline")
paused = sum(1 for monitor in monitors if monitor.status == "degraded")
normalized = UptimeKumaSnapshot(
source_status="online",
monitors_up=up,
monitors_down=down,
monitors_paused=paused,
total=len(monitors),
monitors=sorted(monitors, key=lambda monitor: monitor.name.lower()),
)
logger.info("uptime kuma normalized snapshot: %s", normalized.model_dump())
return normalized
async def _request_metrics_with_mode(
self,
mode: str,
*,
headers: dict[str, str] | None = None,
auth: tuple[str, str] | None = None,
) -> str | None:
if not self.base_url:
return None
url = f"{self.base_url}/metrics"
try:
async with httpx.AsyncClient(
timeout=self.settings.request_timeout_seconds,
trust_env=False,
) as client:
response = await client.request(
"GET",
url,
headers=headers,
auth=auth,
)
if response.status_code == 200 and response.text:
logger.info("uptime kuma metrics auth succeeded via %s", mode)
return response.text
if response.status_code in {401, 403}:
logger.warning("uptime kuma auth failed (401/403)")
else:
logger.info(
"uptime kuma metrics auth failed via %s with status %s",
mode,
response.status_code,
)
return None
except httpx.TimeoutException:
logger.warning("uptime kuma metrics request timed out via %s", mode)
except httpx.HTTPError as exc:
logger.warning("uptime kuma metrics request error via %s: %s", mode, exc)
return None
def _parse_metrics(self, payload: str) -> list[UptimeKumaMonitor]:
status_by_id: dict[str, UptimeKumaMonitor] = {}
for line in payload.splitlines():
parsed = self._parse_metric_line(line)
if parsed is None:
continue
metric_name, labels, raw_value = parsed
monitor_id = labels.get("monitor_id") or labels.get("id") or labels.get("monitor")
monitor_name = labels.get("monitor_name") or labels.get("name")
if not monitor_id or not monitor_name:
continue
monitor = status_by_id.setdefault(
monitor_id,
UptimeKumaMonitor(
id=monitor_id,
name=monitor_name,
status="offline",
monitor_type=labels.get("monitor_type") or labels.get("type"),
),
)
if metric_name == "monitor_status":
status_code = self._as_float(raw_value)
if status_code == 1:
monitor.status = "online"
elif status_code == 3:
monitor.status = "degraded"
else:
monitor.status = "offline"
elif metric_name == "monitor_response_time":
latency = self._as_float(raw_value)
monitor.latency_ms = int(latency) if latency >= 0 else None
return list(status_by_id.values())
@staticmethod
def _parse_metric_line(line: str) -> tuple[str, dict[str, str], str] | None:
if not line or line.startswith("#"):
return None
match = METRIC_LINE_RE.match(line.strip())
if not match:
return None
labels = {
key: value.encode("utf-8").decode("unicode_escape")
for key, value in LABEL_RE.findall(match.group("labels"))
}
return match.group("name"), labels, match.group("value")
@staticmethod
def _as_float(value: str) -> float:
try:
return float(value)
except ValueError:
return -1.0