Files
homelab-infra/apps/dashboard/backend/app/clients/beszel_client.py
T

418 lines
15 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
import logging
from typing import Any
from app.clients.base import BaseHTTPClient
from app.config import Settings
from app.models.sources import (
BeszelDiskMetric,
BeszelSystemSnapshot,
DockerContainerSummary,
DockerSnapshot,
)
logger = logging.getLogger(__name__)
class BeszelClient(BaseHTTPClient):
"""
Beszel exposes a PocketBase-backed REST API. The exact record schema may
change between Beszel releases, so this client intentionally normalizes
multiple likely field layouts into a stable internal snapshot.
"""
def __init__(self, settings: Settings) -> None:
super().__init__(settings, "beszel", settings.beszel_base_url)
self._admin_jwt: str | None = None
self._admin_jwt_expires_at: datetime | None = None
async def fetch_system_snapshot(self) -> BeszelSystemSnapshot:
snapshot = BeszelSystemSnapshot()
if not self.base_url:
logger.info("beszel skipped: base URL missing")
return snapshot
headers = await self._build_auth_headers()
if headers is None:
logger.warning("beszel skipped: no usable auth method configured")
return snapshot
payload = await self._request_json(
"GET",
"/api/collections/system_stats/records",
headers=headers,
params={"page": 1, "perPage": 1, "sort": "-created"},
)
if not payload:
logger.warning("beszel returned empty payload")
return snapshot
logger.info("beszel raw payload: %s", payload)
items = payload.get("items") if isinstance(payload, dict) else None
if not items:
logger.warning("beszel returned no system_stats records")
return snapshot
record = items[0]
details = await self._fetch_system_details(headers, record)
normalized = self._normalize_snapshot(record, details)
logger.info("beszel normalized snapshot: %s", normalized.model_dump())
return normalized
async def fetch_container_snapshot(self) -> DockerSnapshot:
snapshot = DockerSnapshot()
if not self.base_url:
return snapshot
headers = await self._build_auth_headers()
if headers is None:
return snapshot
payload = await self._request_json(
"GET",
"/api/collections/containers/records",
headers=headers,
params={"page": 1, "perPage": 200, "sort": "-updated"},
)
logger.info("beszel raw containers payload: %s", payload)
if not isinstance(payload, dict):
logger.warning("beszel containers mapping: payload is not a dict")
return snapshot
items = payload.get("items")
if not isinstance(items, list):
logger.warning("beszel containers mapping: items missing or not a list")
return snapshot
if not items:
logger.warning("beszel containers mapping: no container records returned")
return snapshot
containers: list[DockerContainerSummary] = []
running = 0
stopped = 0
unhealthy = 0
for item in items:
if not isinstance(item, dict):
continue
state = self._normalize_container_state(item)
if state == "running":
running += 1
elif state == "unhealthy":
unhealthy += 1
else:
stopped += 1
containers.append(
DockerContainerSummary(
id=str(item.get("id") or ""),
name=str(item.get("name") or item.get("container") or item.get("service") or "unknown"),
state=state,
status_text=str(item.get("status") or item.get("state") or "unknown"),
image=str(item.get("image") or ""),
health=str(item.get("health") or item.get("health_status") or "").lower() or None,
)
)
normalized = DockerSnapshot(
source_status="online",
running=running,
stopped=stopped,
unhealthy=unhealthy,
total=len(containers),
containers=containers,
)
logger.info("beszel normalized containers snapshot: %s", normalized.model_dump())
return normalized
def _normalize_snapshot(self, record: dict[str, Any], details: dict[str, Any] | None) -> BeszelSystemSnapshot:
stats = self._coerce_mapping(record.get("stats") or {})
details = self._coerce_mapping(details or {})
details_payload = self._coerce_mapping(
details.get("details")
or details.get("stats")
or details.get("info")
or details.get("data")
or {}
)
expanded_system = self._coerce_mapping(self._coerce_mapping(record.get("expand")).get("system"))
memory_used_gb = self._as_float(stats.get("m"))
memory_total_gb = self._as_float(stats.get("m"))
if stats.get("mp"):
memory_total_gb = round(memory_used_gb / (self._as_float(stats.get("mp")) / 100), 1) if self._as_float(stats.get("mp")) else memory_used_gb
memory_available_gb = max(round(memory_total_gb - memory_used_gb, 1), 0.0)
network_pair = stats.get("b") if isinstance(stats.get("b"), list) else []
disks = self._normalize_disks(
details_payload.get("disks")
or details_payload.get("disk")
or details_payload.get("mounts")
or details.get("disks")
or details.get("disk")
or details.get("mounts")
or []
)
if not disks:
logger.info("beszel storage unsupported: no disks/mounts in payload")
return BeszelSystemSnapshot(
source_status="online",
host_name=str(
details_payload.get("hostname")
or details_payload.get("host")
or details.get("hostname")
or details.get("host")
or expanded_system.get("name")
or record.get("system_name")
or record.get("name")
or "unknown"
),
agent_name="beszel-agent",
cpu_usage_percent=self._as_float(stats.get("cpu")),
cpu_cores=len(stats.get("cpus")) if isinstance(stats.get("cpus"), list) else 0,
load_1=0.0,
load_5=0.0,
load_15=0.0,
memory_used_gb=memory_used_gb,
memory_total_gb=memory_total_gb,
memory_available_gb=memory_available_gb,
memory_usage_percent=self._as_float(stats.get("mp")),
primary_interface=str(
details_payload.get("primary_interface")
or details_payload.get("interface")
or details.get("primary_interface")
or "primary"
),
network_rx_mbps=self._network_value_to_mbps(network_pair[0] if len(network_pair) > 0 else 0),
network_tx_mbps=self._network_value_to_mbps(network_pair[1] if len(network_pair) > 1 else 0),
uptime_seconds=self._minutes_to_seconds(stats.get("d")),
platform=str(
details_payload.get("platform")
or details_payload.get("os")
or details.get("platform")
or "unknown"
),
kernel=str(details_payload.get("kernel") or details.get("kernel") or "unknown"),
disks=disks,
)
async def _fetch_system_details(
self,
headers: dict[str, str],
stats_record: dict[str, Any],
) -> dict[str, Any] | None:
system_id = stats_record.get("system")
params: dict[str, Any] = {
"page": 1,
"perPage": 100,
"sort": "-created",
}
payload = await self._request_json(
"GET",
"/api/collections/system_details/records",
headers=headers,
params=params,
)
logger.info("beszel raw details payload: %s", payload)
if not isinstance(payload, dict):
logger.warning("beszel system_details mapping: payload is not a dict")
return None
items = payload.get("items")
if isinstance(items, list) and items:
if system_id:
for item in items:
if isinstance(item, dict) and str(item.get("system") or "") == str(system_id):
return item
return items[0]
logger.warning("beszel system_details mapping: no matching detail records returned")
return None
@staticmethod
def _normalize_container_state(item: dict[str, Any]) -> str:
raw = " ".join(
str(item.get(key) or "")
for key in ("status", "state", "health", "health_status")
).lower()
if "unhealthy" in raw or "degraded" in raw:
return "unhealthy"
if any(token in raw for token in ("running", "up", "healthy", "active")):
return "running"
if raw.strip():
return "stopped"
return "unknown"
async def _build_auth_headers(self) -> dict[str, str] | None:
admin_headers = await self._get_admin_auth_headers()
if admin_headers:
return admin_headers
if self.settings.beszel_api_token:
return {"Authorization": self.settings.beszel_api_token}
return None
async def _get_admin_auth_headers(self) -> dict[str, str] | None:
if not self.settings.beszel_admin_email or not self.settings.beszel_admin_password:
return None
now = datetime.now(timezone.utc)
if self._admin_jwt and self._admin_jwt_expires_at and now < self._admin_jwt_expires_at:
return {"Authorization": self._admin_jwt}
auth_payload = await self._request_json(
"POST",
"/api/collections/_superusers/auth-with-password",
headers={"Content-Type": "application/json"},
params=None,
)
if auth_payload is None:
auth_payload = await self._request_json_with_body(
"POST",
"/api/collections/_superusers/auth-with-password",
json_body={
"identity": self.settings.beszel_admin_email,
"password": self.settings.beszel_admin_password,
},
)
if not isinstance(auth_payload, dict):
logger.warning("beszel admin auth failed: no payload")
return None
token = auth_payload.get("token")
if not token:
logger.warning("beszel admin auth failed: token missing")
return None
self._admin_jwt = str(token)
self._admin_jwt_expires_at = now + timedelta(minutes=30)
logger.info("beszel admin auth succeeded")
return {"Authorization": self._admin_jwt}
async def _request_json_with_body(
self,
method: str,
path: str,
*,
json_body: dict[str, Any],
) -> Any | None:
if not self.base_url:
return None
import httpx
url = f"{self.base_url}/{path.lstrip('/')}"
try:
async with httpx.AsyncClient(
timeout=self.settings.request_timeout_seconds,
trust_env=False,
) as client:
response = await client.request(
method,
url,
json=json_body,
headers={"Content-Type": "application/json"},
)
response.raise_for_status()
return response.json()
except httpx.TimeoutException:
logger.warning("beszel auth request timed out: %s %s", method, url)
except httpx.HTTPStatusError as exc:
logger.warning("beszel auth request failed with status %s for %s %s", exc.response.status_code, method, url)
try:
logger.info("beszel auth error payload: %s", exc.response.json())
except ValueError:
logger.info("beszel auth error text: %s", exc.response.text)
except httpx.HTTPError as exc:
logger.warning("beszel auth request error for %s %s: %s", method, url, exc)
return None
def _normalize_disks(self, raw_disks: Any) -> list[BeszelDiskMetric]:
if isinstance(raw_disks, dict):
raw_disks = [
{"mount": key, **(value if isinstance(value, dict) else {"used": value})}
for key, value in raw_disks.items()
]
disks: list[BeszelDiskMetric] = []
if not isinstance(raw_disks, list):
return disks
for item in raw_disks:
if not isinstance(item, dict):
continue
total_gb = self._bytes_to_gb(item.get("total") or item.get("total_bytes"))
used_gb = self._bytes_to_gb(item.get("used") or item.get("used_bytes"))
free_gb = self._bytes_to_gb(item.get("free") or item.get("free_bytes"))
usage_percent = self._as_float(item.get("usage_percent") or item.get("percent"))
if not total_gb and used_gb and free_gb:
total_gb = round(used_gb + free_gb, 1)
disks.append(
BeszelDiskMetric(
name=str(item.get("name") or item.get("device") or item.get("mount") or "disk"),
mount=str(item.get("mount") or item.get("path") or "/"),
used_gb=used_gb,
total_gb=total_gb,
free_gb=free_gb,
usage_percent=usage_percent,
)
)
return disks
@staticmethod
def _coerce_mapping(value: Any) -> dict[str, Any]:
return value if isinstance(value, dict) else {}
@staticmethod
def _as_float(value: Any) -> float:
try:
return round(float(value or 0), 1)
except (TypeError, ValueError):
return 0.0
@staticmethod
def _as_int(value: Any) -> int:
try:
return int(float(value or 0))
except (TypeError, ValueError):
return 0
@classmethod
def _bytes_to_gb(cls, value: Any) -> float:
if value in (None, ""):
return 0.0
try:
return round(float(value) / (1024 ** 3), 1)
except (TypeError, ValueError):
return 0.0
@classmethod
def _bytes_per_second_to_mbps(cls, value: Any) -> float:
if value in (None, ""):
return 0.0
try:
return round((float(value) * 8) / 1_000_000, 1)
except (TypeError, ValueError):
return 0.0
@classmethod
def _network_value_to_mbps(cls, value: Any) -> float:
try:
numeric = float(value or 0)
except (TypeError, ValueError):
return 0.0
if numeric <= 0:
return 0.0
return round((numeric * 8) / 1_000_000, 1)
@classmethod
def _minutes_to_seconds(cls, value: Any) -> int:
try:
return int(float(value or 0) * 60)
except (TypeError, ValueError):
return 0