from __future__ import annotations from datetime import datetime, timedelta, timezone import logging from typing import Any from app.clients.base import BaseHTTPClient from app.config import Settings from app.models.sources import ( BeszelDiskMetric, BeszelSystemSnapshot, DockerContainerSummary, DockerSnapshot, ) logger = logging.getLogger(__name__) class BeszelClient(BaseHTTPClient): """ Beszel exposes a PocketBase-backed REST API. The exact record schema may change between Beszel releases, so this client intentionally normalizes multiple likely field layouts into a stable internal snapshot. """ def __init__(self, settings: Settings) -> None: super().__init__(settings, "beszel", settings.beszel_base_url) self._admin_jwt: str | None = None self._admin_jwt_expires_at: datetime | None = None async def fetch_system_snapshot(self) -> BeszelSystemSnapshot: snapshot = BeszelSystemSnapshot() if not self.base_url: logger.info("beszel skipped: base URL missing") return snapshot headers = await self._build_auth_headers() if headers is None: logger.warning("beszel skipped: no usable auth method configured") return snapshot payload = await self._request_json( "GET", "/api/collections/system_stats/records", headers=headers, params={"page": 1, "perPage": 1, "sort": "-created"}, ) if not payload: logger.warning("beszel returned empty payload") return snapshot logger.info("beszel raw payload: %s", payload) items = payload.get("items") if isinstance(payload, dict) else None if not items: logger.warning("beszel returned no system_stats records") return snapshot record = items[0] details = await self._fetch_system_details(headers, record) normalized = self._normalize_snapshot(record, details) logger.info("beszel normalized snapshot: %s", normalized.model_dump()) return normalized async def fetch_container_snapshot(self) -> DockerSnapshot: snapshot = DockerSnapshot() if not self.base_url: return snapshot headers = await self._build_auth_headers() if headers is None: return snapshot payload = await self._request_json( "GET", "/api/collections/containers/records", headers=headers, params={"page": 1, "perPage": 200, "sort": "-updated"}, ) logger.info("beszel raw containers payload: %s", payload) if not isinstance(payload, dict): logger.warning("beszel containers mapping: payload is not a dict") return snapshot items = payload.get("items") if not isinstance(items, list): logger.warning("beszel containers mapping: items missing or not a list") return snapshot if not items: logger.warning("beszel containers mapping: no container records returned") return snapshot containers: list[DockerContainerSummary] = [] running = 0 stopped = 0 unhealthy = 0 for item in items: if not isinstance(item, dict): continue state = self._normalize_container_state(item) if state == "running": running += 1 elif state == "unhealthy": unhealthy += 1 else: stopped += 1 containers.append( DockerContainerSummary( id=str(item.get("id") or ""), name=str(item.get("name") or item.get("container") or item.get("service") or "unknown"), state=state, status_text=str(item.get("status") or item.get("state") or "unknown"), image=str(item.get("image") or ""), health=str(item.get("health") or item.get("health_status") or "").lower() or None, ) ) normalized = DockerSnapshot( source_status="online", running=running, stopped=stopped, unhealthy=unhealthy, total=len(containers), containers=containers, ) logger.info("beszel normalized containers snapshot: %s", normalized.model_dump()) return normalized def _normalize_snapshot(self, record: dict[str, Any], details: dict[str, Any] | None) -> BeszelSystemSnapshot: stats = self._coerce_mapping(record.get("stats") or {}) details = self._coerce_mapping(details or {}) details_payload = self._coerce_mapping( details.get("details") or details.get("stats") or details.get("info") or details.get("data") or {} ) expanded_system = self._coerce_mapping(self._coerce_mapping(record.get("expand")).get("system")) memory_used_gb = self._as_float(stats.get("m")) memory_total_gb = self._as_float(stats.get("m")) if stats.get("mp"): memory_total_gb = round(memory_used_gb / (self._as_float(stats.get("mp")) / 100), 1) if self._as_float(stats.get("mp")) else memory_used_gb memory_available_gb = max(round(memory_total_gb - memory_used_gb, 1), 0.0) network_pair = stats.get("b") if isinstance(stats.get("b"), list) else [] disks = self._normalize_disks( details_payload.get("disks") or details_payload.get("disk") or details_payload.get("mounts") or details.get("disks") or details.get("disk") or details.get("mounts") or [] ) if not disks and self._as_float(stats.get("dp")) > 0: disk_pct = self._as_float(stats.get("dp")) disk_used = self._as_float(stats.get("du")) disk_total = round(disk_used / (disk_pct / 100), 1) if disk_pct > 0 else 0.0 disk_free = round(max(disk_total - disk_used, 0.0), 1) logger.info("beszel storage fallback: using dp=%.1f du=%.1f from stats", disk_pct, disk_used) disks = [BeszelDiskMetric( name="rootfs", mount="/", used_gb=disk_used, total_gb=disk_total, free_gb=disk_free, usage_percent=disk_pct, )] if not disks: logger.info("beszel storage unsupported: no disks/mounts in payload") return BeszelSystemSnapshot( source_status="online", host_name=str( details_payload.get("hostname") or details_payload.get("host") or details.get("hostname") or details.get("host") or expanded_system.get("name") or record.get("system_name") or record.get("name") or "unknown" ), agent_name="beszel-agent", cpu_usage_percent=self._as_float(stats.get("cpu")), cpu_cores=len(stats.get("cpus")) if isinstance(stats.get("cpus"), list) else 0, load_1=0.0, load_5=0.0, load_15=0.0, memory_used_gb=memory_used_gb, memory_total_gb=memory_total_gb, memory_available_gb=memory_available_gb, memory_usage_percent=self._as_float(stats.get("mp")), primary_interface=str( details_payload.get("primary_interface") or details_payload.get("interface") or details.get("primary_interface") or "primary" ), network_rx_mbps=self._network_value_to_mbps(network_pair[0] if len(network_pair) > 0 else 0), network_tx_mbps=self._network_value_to_mbps(network_pair[1] if len(network_pair) > 1 else 0), uptime_seconds=self._minutes_to_seconds(stats.get("d")), platform=str( details_payload.get("platform") or details_payload.get("os") or details.get("platform") or "unknown" ), kernel=str(details_payload.get("kernel") or details.get("kernel") or "unknown"), disks=disks, ) async def _fetch_system_details( self, headers: dict[str, str], stats_record: dict[str, Any], ) -> dict[str, Any] | None: system_id = stats_record.get("system") params: dict[str, Any] = { "page": 1, "perPage": 100, "sort": "-created", } payload = await self._request_json( "GET", "/api/collections/system_details/records", headers=headers, params=params, ) logger.info("beszel raw details payload: %s", payload) if not isinstance(payload, dict): logger.warning("beszel system_details mapping: payload is not a dict") return None items = payload.get("items") if isinstance(items, list) and items: if system_id: for item in items: if isinstance(item, dict) and str(item.get("system") or "") == str(system_id): return item return items[0] logger.warning("beszel system_details mapping: no matching detail records returned") return None @staticmethod def _normalize_container_state(item: dict[str, Any]) -> str: raw = " ".join( str(item.get(key) or "") for key in ("status", "state", "health", "health_status") ).lower() if "unhealthy" in raw or "degraded" in raw: return "unhealthy" if any(token in raw for token in ("running", "up", "healthy", "active")): return "running" if raw.strip(): return "stopped" return "unknown" async def _build_auth_headers(self) -> dict[str, str] | None: admin_headers = await self._get_admin_auth_headers() if admin_headers: return admin_headers if self.settings.beszel_api_token: return {"Authorization": self.settings.beszel_api_token} return None async def _get_admin_auth_headers(self) -> dict[str, str] | None: if not self.settings.beszel_admin_email or not self.settings.beszel_admin_password: return None now = datetime.now(timezone.utc) if self._admin_jwt and self._admin_jwt_expires_at and now < self._admin_jwt_expires_at: return {"Authorization": self._admin_jwt} auth_payload = await self._request_json( "POST", "/api/collections/_superusers/auth-with-password", headers={"Content-Type": "application/json"}, params=None, ) if auth_payload is None: auth_payload = await self._request_json_with_body( "POST", "/api/collections/_superusers/auth-with-password", json_body={ "identity": self.settings.beszel_admin_email, "password": self.settings.beszel_admin_password, }, ) if not isinstance(auth_payload, dict): logger.warning("beszel admin auth failed: no payload") return None token = auth_payload.get("token") if not token: logger.warning("beszel admin auth failed: token missing") return None self._admin_jwt = str(token) self._admin_jwt_expires_at = now + timedelta(minutes=30) logger.info("beszel admin auth succeeded") return {"Authorization": self._admin_jwt} async def _request_json_with_body( self, method: str, path: str, *, json_body: dict[str, Any], ) -> Any | None: if not self.base_url: return None import httpx url = f"{self.base_url}/{path.lstrip('/')}" try: async with httpx.AsyncClient( timeout=self.settings.request_timeout_seconds, trust_env=False, ) as client: response = await client.request( method, url, json=json_body, headers={"Content-Type": "application/json"}, ) response.raise_for_status() return response.json() except httpx.TimeoutException: logger.warning("beszel auth request timed out: %s %s", method, url) except httpx.HTTPStatusError as exc: logger.warning("beszel auth request failed with status %s for %s %s", exc.response.status_code, method, url) try: logger.info("beszel auth error payload: %s", exc.response.json()) except ValueError: logger.info("beszel auth error text: %s", exc.response.text) except httpx.HTTPError as exc: logger.warning("beszel auth request error for %s %s: %s", method, url, exc) return None def _normalize_disks(self, raw_disks: Any) -> list[BeszelDiskMetric]: if isinstance(raw_disks, dict): raw_disks = [ {"mount": key, **(value if isinstance(value, dict) else {"used": value})} for key, value in raw_disks.items() ] disks: list[BeszelDiskMetric] = [] if not isinstance(raw_disks, list): return disks for item in raw_disks: if not isinstance(item, dict): continue total_gb = self._bytes_to_gb(item.get("total") or item.get("total_bytes")) used_gb = self._bytes_to_gb(item.get("used") or item.get("used_bytes")) free_gb = self._bytes_to_gb(item.get("free") or item.get("free_bytes")) usage_percent = self._as_float(item.get("usage_percent") or item.get("percent")) if not total_gb and used_gb and free_gb: total_gb = round(used_gb + free_gb, 1) disks.append( BeszelDiskMetric( name=str(item.get("name") or item.get("device") or item.get("mount") or "disk"), mount=str(item.get("mount") or item.get("path") or "/"), used_gb=used_gb, total_gb=total_gb, free_gb=free_gb, usage_percent=usage_percent, ) ) return disks @staticmethod def _coerce_mapping(value: Any) -> dict[str, Any]: return value if isinstance(value, dict) else {} @staticmethod def _as_float(value: Any) -> float: try: return round(float(value or 0), 1) except (TypeError, ValueError): return 0.0 @staticmethod def _as_int(value: Any) -> int: try: return int(float(value or 0)) except (TypeError, ValueError): return 0 @classmethod def _bytes_to_gb(cls, value: Any) -> float: if value in (None, ""): return 0.0 try: return round(float(value) / (1024 ** 3), 1) except (TypeError, ValueError): return 0.0 @classmethod def _bytes_per_second_to_mbps(cls, value: Any) -> float: if value in (None, ""): return 0.0 try: return round((float(value) * 8) / 1_000_000, 1) except (TypeError, ValueError): return 0.0 @classmethod def _network_value_to_mbps(cls, value: Any) -> float: try: numeric = float(value or 0) except (TypeError, ValueError): return 0.0 if numeric <= 0: return 0.0 return round((numeric * 8) / 1_000_000, 1) @classmethod def _minutes_to_seconds(cls, value: Any) -> int: try: return int(float(value or 0) * 60) except (TypeError, ValueError): return 0