432 lines
16 KiB
Python
432 lines
16 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
import logging
|
|
from typing import Any
|
|
|
|
from app.clients.base import BaseHTTPClient
|
|
from app.config import Settings
|
|
from app.models.sources import (
|
|
BeszelDiskMetric,
|
|
BeszelSystemSnapshot,
|
|
DockerContainerSummary,
|
|
DockerSnapshot,
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BeszelClient(BaseHTTPClient):
|
|
"""
|
|
Beszel exposes a PocketBase-backed REST API. The exact record schema may
|
|
change between Beszel releases, so this client intentionally normalizes
|
|
multiple likely field layouts into a stable internal snapshot.
|
|
"""
|
|
|
|
def __init__(self, settings: Settings) -> None:
|
|
super().__init__(settings, "beszel", settings.beszel_base_url)
|
|
self._admin_jwt: str | None = None
|
|
self._admin_jwt_expires_at: datetime | None = None
|
|
|
|
async def fetch_system_snapshot(self) -> BeszelSystemSnapshot:
|
|
snapshot = BeszelSystemSnapshot()
|
|
if not self.base_url:
|
|
logger.info("beszel skipped: base URL missing")
|
|
return snapshot
|
|
|
|
headers = await self._build_auth_headers()
|
|
if headers is None:
|
|
logger.warning("beszel skipped: no usable auth method configured")
|
|
return snapshot
|
|
|
|
payload = await self._request_json(
|
|
"GET",
|
|
"/api/collections/system_stats/records",
|
|
headers=headers,
|
|
params={"page": 1, "perPage": 1, "sort": "-created"},
|
|
)
|
|
if not payload:
|
|
logger.warning("beszel returned empty payload")
|
|
return snapshot
|
|
|
|
logger.info("beszel raw payload: %s", payload)
|
|
|
|
items = payload.get("items") if isinstance(payload, dict) else None
|
|
if not items:
|
|
logger.warning("beszel returned no system_stats records")
|
|
return snapshot
|
|
|
|
record = items[0]
|
|
details = await self._fetch_system_details(headers, record)
|
|
normalized = self._normalize_snapshot(record, details)
|
|
logger.info("beszel normalized snapshot: %s", normalized.model_dump())
|
|
return normalized
|
|
|
|
async def fetch_container_snapshot(self) -> DockerSnapshot:
|
|
snapshot = DockerSnapshot()
|
|
if not self.base_url:
|
|
return snapshot
|
|
|
|
headers = await self._build_auth_headers()
|
|
if headers is None:
|
|
return snapshot
|
|
|
|
payload = await self._request_json(
|
|
"GET",
|
|
"/api/collections/containers/records",
|
|
headers=headers,
|
|
params={"page": 1, "perPage": 200, "sort": "-updated"},
|
|
)
|
|
logger.info("beszel raw containers payload: %s", payload)
|
|
|
|
if not isinstance(payload, dict):
|
|
logger.warning("beszel containers mapping: payload is not a dict")
|
|
return snapshot
|
|
items = payload.get("items")
|
|
if not isinstance(items, list):
|
|
logger.warning("beszel containers mapping: items missing or not a list")
|
|
return snapshot
|
|
if not items:
|
|
logger.warning("beszel containers mapping: no container records returned")
|
|
return snapshot
|
|
|
|
containers: list[DockerContainerSummary] = []
|
|
running = 0
|
|
stopped = 0
|
|
unhealthy = 0
|
|
|
|
for item in items:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
state = self._normalize_container_state(item)
|
|
if state == "running":
|
|
running += 1
|
|
elif state == "unhealthy":
|
|
unhealthy += 1
|
|
else:
|
|
stopped += 1
|
|
|
|
containers.append(
|
|
DockerContainerSummary(
|
|
id=str(item.get("id") or ""),
|
|
name=str(item.get("name") or item.get("container") or item.get("service") or "unknown"),
|
|
state=state,
|
|
status_text=str(item.get("status") or item.get("state") or "unknown"),
|
|
image=str(item.get("image") or ""),
|
|
health=str(item.get("health") or item.get("health_status") or "").lower() or None,
|
|
)
|
|
)
|
|
|
|
normalized = DockerSnapshot(
|
|
source_status="online",
|
|
running=running,
|
|
stopped=stopped,
|
|
unhealthy=unhealthy,
|
|
total=len(containers),
|
|
containers=containers,
|
|
)
|
|
logger.info("beszel normalized containers snapshot: %s", normalized.model_dump())
|
|
return normalized
|
|
|
|
def _normalize_snapshot(self, record: dict[str, Any], details: dict[str, Any] | None) -> BeszelSystemSnapshot:
|
|
stats = self._coerce_mapping(record.get("stats") or {})
|
|
details = self._coerce_mapping(details or {})
|
|
details_payload = self._coerce_mapping(
|
|
details.get("details")
|
|
or details.get("stats")
|
|
or details.get("info")
|
|
or details.get("data")
|
|
or {}
|
|
)
|
|
expanded_system = self._coerce_mapping(self._coerce_mapping(record.get("expand")).get("system"))
|
|
|
|
memory_used_gb = self._as_float(stats.get("m"))
|
|
memory_total_gb = self._as_float(stats.get("m"))
|
|
if stats.get("mp"):
|
|
memory_total_gb = round(memory_used_gb / (self._as_float(stats.get("mp")) / 100), 1) if self._as_float(stats.get("mp")) else memory_used_gb
|
|
memory_available_gb = max(round(memory_total_gb - memory_used_gb, 1), 0.0)
|
|
network_pair = stats.get("b") if isinstance(stats.get("b"), list) else []
|
|
disks = self._normalize_disks(
|
|
details_payload.get("disks")
|
|
or details_payload.get("disk")
|
|
or details_payload.get("mounts")
|
|
or details.get("disks")
|
|
or details.get("disk")
|
|
or details.get("mounts")
|
|
or []
|
|
)
|
|
if not disks and self._as_float(stats.get("dp")) > 0:
|
|
disk_pct = self._as_float(stats.get("dp"))
|
|
disk_used = self._as_float(stats.get("du"))
|
|
disk_total = round(disk_used / (disk_pct / 100), 1) if disk_pct > 0 else 0.0
|
|
disk_free = round(max(disk_total - disk_used, 0.0), 1)
|
|
logger.info("beszel storage fallback: using dp=%.1f du=%.1f from stats", disk_pct, disk_used)
|
|
disks = [BeszelDiskMetric(
|
|
name="rootfs",
|
|
mount="/",
|
|
used_gb=disk_used,
|
|
total_gb=disk_total,
|
|
free_gb=disk_free,
|
|
usage_percent=disk_pct,
|
|
)]
|
|
if not disks:
|
|
logger.info("beszel storage unsupported: no disks/mounts in payload")
|
|
|
|
return BeszelSystemSnapshot(
|
|
source_status="online",
|
|
host_name=str(
|
|
details_payload.get("hostname")
|
|
or details_payload.get("host")
|
|
or details.get("hostname")
|
|
or details.get("host")
|
|
or expanded_system.get("name")
|
|
or record.get("system_name")
|
|
or record.get("name")
|
|
or "unknown"
|
|
),
|
|
agent_name="beszel-agent",
|
|
cpu_usage_percent=self._as_float(stats.get("cpu")),
|
|
cpu_cores=len(stats.get("cpus")) if isinstance(stats.get("cpus"), list) else 0,
|
|
load_1=0.0,
|
|
load_5=0.0,
|
|
load_15=0.0,
|
|
memory_used_gb=memory_used_gb,
|
|
memory_total_gb=memory_total_gb,
|
|
memory_available_gb=memory_available_gb,
|
|
memory_usage_percent=self._as_float(stats.get("mp")),
|
|
primary_interface=str(
|
|
details_payload.get("primary_interface")
|
|
or details_payload.get("interface")
|
|
or details.get("primary_interface")
|
|
or "primary"
|
|
),
|
|
network_rx_mbps=self._network_value_to_mbps(network_pair[0] if len(network_pair) > 0 else 0),
|
|
network_tx_mbps=self._network_value_to_mbps(network_pair[1] if len(network_pair) > 1 else 0),
|
|
uptime_seconds=self._minutes_to_seconds(stats.get("d")),
|
|
platform=str(
|
|
details_payload.get("platform")
|
|
or details_payload.get("os")
|
|
or details.get("platform")
|
|
or "unknown"
|
|
),
|
|
kernel=str(details_payload.get("kernel") or details.get("kernel") or "unknown"),
|
|
disks=disks,
|
|
)
|
|
|
|
async def _fetch_system_details(
|
|
self,
|
|
headers: dict[str, str],
|
|
stats_record: dict[str, Any],
|
|
) -> dict[str, Any] | None:
|
|
system_id = stats_record.get("system")
|
|
params: dict[str, Any] = {
|
|
"page": 1,
|
|
"perPage": 100,
|
|
"sort": "-created",
|
|
}
|
|
|
|
payload = await self._request_json(
|
|
"GET",
|
|
"/api/collections/system_details/records",
|
|
headers=headers,
|
|
params=params,
|
|
)
|
|
logger.info("beszel raw details payload: %s", payload)
|
|
|
|
if not isinstance(payload, dict):
|
|
logger.warning("beszel system_details mapping: payload is not a dict")
|
|
return None
|
|
items = payload.get("items")
|
|
if isinstance(items, list) and items:
|
|
if system_id:
|
|
for item in items:
|
|
if isinstance(item, dict) and str(item.get("system") or "") == str(system_id):
|
|
return item
|
|
return items[0]
|
|
logger.warning("beszel system_details mapping: no matching detail records returned")
|
|
return None
|
|
|
|
@staticmethod
|
|
def _normalize_container_state(item: dict[str, Any]) -> str:
|
|
raw = " ".join(
|
|
str(item.get(key) or "")
|
|
for key in ("status", "state", "health", "health_status")
|
|
).lower()
|
|
if "unhealthy" in raw or "degraded" in raw:
|
|
return "unhealthy"
|
|
if any(token in raw for token in ("running", "up", "healthy", "active")):
|
|
return "running"
|
|
if raw.strip():
|
|
return "stopped"
|
|
return "unknown"
|
|
|
|
async def _build_auth_headers(self) -> dict[str, str] | None:
|
|
admin_headers = await self._get_admin_auth_headers()
|
|
if admin_headers:
|
|
return admin_headers
|
|
if self.settings.beszel_api_token:
|
|
return {"Authorization": self.settings.beszel_api_token}
|
|
return None
|
|
|
|
async def _get_admin_auth_headers(self) -> dict[str, str] | None:
|
|
if not self.settings.beszel_admin_email or not self.settings.beszel_admin_password:
|
|
return None
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if self._admin_jwt and self._admin_jwt_expires_at and now < self._admin_jwt_expires_at:
|
|
return {"Authorization": self._admin_jwt}
|
|
|
|
auth_payload = await self._request_json(
|
|
"POST",
|
|
"/api/collections/_superusers/auth-with-password",
|
|
headers={"Content-Type": "application/json"},
|
|
params=None,
|
|
)
|
|
if auth_payload is None:
|
|
auth_payload = await self._request_json_with_body(
|
|
"POST",
|
|
"/api/collections/_superusers/auth-with-password",
|
|
json_body={
|
|
"identity": self.settings.beszel_admin_email,
|
|
"password": self.settings.beszel_admin_password,
|
|
},
|
|
)
|
|
|
|
if not isinstance(auth_payload, dict):
|
|
logger.warning("beszel admin auth failed: no payload")
|
|
return None
|
|
|
|
token = auth_payload.get("token")
|
|
if not token:
|
|
logger.warning("beszel admin auth failed: token missing")
|
|
return None
|
|
|
|
self._admin_jwt = str(token)
|
|
self._admin_jwt_expires_at = now + timedelta(minutes=30)
|
|
logger.info("beszel admin auth succeeded")
|
|
return {"Authorization": self._admin_jwt}
|
|
|
|
async def _request_json_with_body(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
*,
|
|
json_body: dict[str, Any],
|
|
) -> Any | None:
|
|
if not self.base_url:
|
|
return None
|
|
|
|
import httpx
|
|
|
|
url = f"{self.base_url}/{path.lstrip('/')}"
|
|
try:
|
|
async with httpx.AsyncClient(
|
|
timeout=self.settings.request_timeout_seconds,
|
|
trust_env=False,
|
|
) as client:
|
|
response = await client.request(
|
|
method,
|
|
url,
|
|
json=json_body,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except httpx.TimeoutException:
|
|
logger.warning("beszel auth request timed out: %s %s", method, url)
|
|
except httpx.HTTPStatusError as exc:
|
|
logger.warning("beszel auth request failed with status %s for %s %s", exc.response.status_code, method, url)
|
|
try:
|
|
logger.info("beszel auth error payload: %s", exc.response.json())
|
|
except ValueError:
|
|
logger.info("beszel auth error text: %s", exc.response.text)
|
|
except httpx.HTTPError as exc:
|
|
logger.warning("beszel auth request error for %s %s: %s", method, url, exc)
|
|
return None
|
|
|
|
def _normalize_disks(self, raw_disks: Any) -> list[BeszelDiskMetric]:
|
|
if isinstance(raw_disks, dict):
|
|
raw_disks = [
|
|
{"mount": key, **(value if isinstance(value, dict) else {"used": value})}
|
|
for key, value in raw_disks.items()
|
|
]
|
|
disks: list[BeszelDiskMetric] = []
|
|
if not isinstance(raw_disks, list):
|
|
return disks
|
|
|
|
for item in raw_disks:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
total_gb = self._bytes_to_gb(item.get("total") or item.get("total_bytes"))
|
|
used_gb = self._bytes_to_gb(item.get("used") or item.get("used_bytes"))
|
|
free_gb = self._bytes_to_gb(item.get("free") or item.get("free_bytes"))
|
|
usage_percent = self._as_float(item.get("usage_percent") or item.get("percent"))
|
|
if not total_gb and used_gb and free_gb:
|
|
total_gb = round(used_gb + free_gb, 1)
|
|
|
|
disks.append(
|
|
BeszelDiskMetric(
|
|
name=str(item.get("name") or item.get("device") or item.get("mount") or "disk"),
|
|
mount=str(item.get("mount") or item.get("path") or "/"),
|
|
used_gb=used_gb,
|
|
total_gb=total_gb,
|
|
free_gb=free_gb,
|
|
usage_percent=usage_percent,
|
|
)
|
|
)
|
|
return disks
|
|
|
|
@staticmethod
|
|
def _coerce_mapping(value: Any) -> dict[str, Any]:
|
|
return value if isinstance(value, dict) else {}
|
|
|
|
@staticmethod
|
|
def _as_float(value: Any) -> float:
|
|
try:
|
|
return round(float(value or 0), 1)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
@staticmethod
|
|
def _as_int(value: Any) -> int:
|
|
try:
|
|
return int(float(value or 0))
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
@classmethod
|
|
def _bytes_to_gb(cls, value: Any) -> float:
|
|
if value in (None, ""):
|
|
return 0.0
|
|
try:
|
|
return round(float(value) / (1024 ** 3), 1)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
@classmethod
|
|
def _bytes_per_second_to_mbps(cls, value: Any) -> float:
|
|
if value in (None, ""):
|
|
return 0.0
|
|
try:
|
|
return round((float(value) * 8) / 1_000_000, 1)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
|
|
@classmethod
|
|
def _network_value_to_mbps(cls, value: Any) -> float:
|
|
try:
|
|
numeric = float(value or 0)
|
|
except (TypeError, ValueError):
|
|
return 0.0
|
|
if numeric <= 0:
|
|
return 0.0
|
|
return round((numeric * 8) / 1_000_000, 1)
|
|
|
|
@classmethod
|
|
def _minutes_to_seconds(cls, value: Any) -> int:
|
|
try:
|
|
return int(float(value or 0) * 60)
|
|
except (TypeError, ValueError):
|
|
return 0
|