430 lines
15 KiB
Python
430 lines
15 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from datetime import datetime, timezone
|
|
from functools import lru_cache
|
|
from typing import Iterable
|
|
|
|
from app.clients.adguard_client import AdGuardClient
|
|
from app.clients.backrest_client import BackrestClient
|
|
from app.clients.beszel_client import BeszelClient
|
|
from app.clients.docker_proxy_client import DockerProxyClient
|
|
from app.clients.home_assistant_client import HomeAssistantClient
|
|
from app.clients.immich_client import ImmichClient
|
|
from app.clients.scrutiny_client import ScrutinyClient
|
|
from app.clients.uptime_kuma_client import UptimeKumaClient
|
|
from app.config import Settings, get_settings
|
|
from app.models.common import DiskStatus, HealthStatus, OverallStatus
|
|
from app.models.overview import (
|
|
OverviewDockerSummary,
|
|
OverviewHomeAssistantSummary,
|
|
OverviewResponse,
|
|
OverviewServicesSummary,
|
|
OverviewSystemSummary,
|
|
)
|
|
from app.models.services import (
|
|
ServiceItem,
|
|
ServicesDockerSummary,
|
|
ServicesResponse,
|
|
ServicesSummary,
|
|
ServicesUptimeKumaSummary,
|
|
)
|
|
from app.models.sources import (
|
|
AdGuardSnapshot,
|
|
BackrestSnapshot,
|
|
BeszelDiskMetric,
|
|
BeszelSystemSnapshot,
|
|
DockerSnapshot,
|
|
HomeAssistantSnapshot,
|
|
ImmichSnapshot,
|
|
ScrutinySnapshot,
|
|
UptimeKumaMonitor,
|
|
UptimeKumaSnapshot,
|
|
)
|
|
from app.models.storage import StorageDisk, StorageResponse, StorageSummary
|
|
from app.models.system import (
|
|
SystemCPU,
|
|
SystemHost,
|
|
SystemMemory,
|
|
SystemNetwork,
|
|
SystemResponse,
|
|
SystemSource,
|
|
)
|
|
from app.services.cache import TTLCacheService
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AggregatorService:
|
|
def __init__(
|
|
self,
|
|
settings: Settings,
|
|
cache: TTLCacheService,
|
|
beszel_client: BeszelClient,
|
|
docker_client: DockerProxyClient,
|
|
uptime_kuma_client: UptimeKumaClient,
|
|
home_assistant_client: HomeAssistantClient,
|
|
adguard_client: AdGuardClient,
|
|
scrutiny_client: ScrutinyClient,
|
|
immich_client: ImmichClient,
|
|
backrest_client: BackrestClient,
|
|
) -> None:
|
|
self.settings = settings
|
|
self.cache = cache
|
|
self.beszel_client = beszel_client
|
|
self.docker_client = docker_client
|
|
self.uptime_kuma_client = uptime_kuma_client
|
|
self.home_assistant_client = home_assistant_client
|
|
self.adguard_client = adguard_client
|
|
self.scrutiny_client = scrutiny_client
|
|
self.immich_client = immich_client
|
|
self.backrest_client = backrest_client
|
|
|
|
async def get_system(self) -> SystemResponse:
|
|
return await self.cache.get_or_load(
|
|
"system",
|
|
self.settings.cache_ttl_system_seconds,
|
|
self._build_system,
|
|
)
|
|
|
|
async def get_storage(self) -> StorageResponse:
|
|
return await self.cache.get_or_load(
|
|
"storage",
|
|
self.settings.cache_ttl_storage_seconds,
|
|
self._build_storage,
|
|
)
|
|
|
|
async def get_services(self) -> ServicesResponse:
|
|
return await self.cache.get_or_load(
|
|
"services",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self._build_services,
|
|
)
|
|
|
|
async def get_adguard(self) -> AdGuardSnapshot:
|
|
return await self.cache.get_or_load(
|
|
"adguard",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self.adguard_client.fetch_stats,
|
|
)
|
|
|
|
async def get_scrutiny(self) -> ScrutinySnapshot:
|
|
return await self.cache.get_or_load(
|
|
"scrutiny",
|
|
self.settings.cache_ttl_storage_seconds,
|
|
self.scrutiny_client.fetch_summary,
|
|
)
|
|
|
|
async def get_immich(self) -> ImmichSnapshot:
|
|
return await self.cache.get_or_load(
|
|
"immich",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self.immich_client.fetch_stats,
|
|
)
|
|
|
|
async def get_backrest(self) -> BackrestSnapshot:
|
|
return await self.cache.get_or_load(
|
|
"backrest",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self.backrest_client.fetch_status,
|
|
)
|
|
|
|
async def get_home_assistant(self) -> HomeAssistantSnapshot:
|
|
return await self.cache.get_or_load(
|
|
"home_assistant",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self.home_assistant_client.fetch_status,
|
|
)
|
|
|
|
async def get_uptime_kuma(self) -> UptimeKumaSnapshot:
|
|
return await self.cache.get_or_load(
|
|
"uptime_kuma",
|
|
self.settings.cache_ttl_services_seconds,
|
|
self.uptime_kuma_client.fetch_monitors,
|
|
)
|
|
|
|
async def get_overview(self) -> OverviewResponse:
|
|
return await self.cache.get_or_load(
|
|
"overview",
|
|
self.settings.cache_ttl_overview_seconds,
|
|
self._build_overview,
|
|
)
|
|
|
|
async def _build_system(self) -> SystemResponse:
|
|
snapshot = await self.beszel_client.fetch_system_snapshot()
|
|
now = datetime.now(timezone.utc)
|
|
return SystemResponse(
|
|
generated_at=now,
|
|
source=SystemSource(
|
|
name=snapshot.source_name,
|
|
status=snapshot.source_status,
|
|
host_name=snapshot.host_name,
|
|
agent_name=snapshot.agent_name,
|
|
),
|
|
cpu=SystemCPU(
|
|
usage_percent=snapshot.cpu_usage_percent,
|
|
cores=snapshot.cpu_cores,
|
|
load_1=snapshot.load_1,
|
|
load_5=snapshot.load_5,
|
|
load_15=snapshot.load_15,
|
|
),
|
|
memory=SystemMemory(
|
|
used_gb=snapshot.memory_used_gb,
|
|
total_gb=snapshot.memory_total_gb,
|
|
available_gb=snapshot.memory_available_gb,
|
|
usage_percent=snapshot.memory_usage_percent,
|
|
),
|
|
network=SystemNetwork(
|
|
primary_interface=snapshot.primary_interface,
|
|
rx_mbps=snapshot.network_rx_mbps,
|
|
tx_mbps=snapshot.network_tx_mbps,
|
|
),
|
|
host=SystemHost(
|
|
uptime_seconds=snapshot.uptime_seconds,
|
|
platform=snapshot.platform,
|
|
kernel=snapshot.kernel,
|
|
),
|
|
)
|
|
|
|
async def _build_storage(self) -> StorageResponse:
|
|
snapshot = await self.beszel_client.fetch_system_snapshot()
|
|
now = datetime.now(timezone.utc)
|
|
|
|
disks = [self._map_disk(d, snapshot.source_status) for d in snapshot.disks]
|
|
storage_source_status = snapshot.source_status
|
|
if snapshot.source_status == "online" and not disks:
|
|
storage_source_status = "unsupported"
|
|
|
|
if disks:
|
|
root = next((d for d in disks if d.mount == "/"), disks[0])
|
|
else:
|
|
root = StorageDisk(
|
|
name="rootfs",
|
|
mount="/",
|
|
used_gb=0.0,
|
|
total_gb=0.0,
|
|
free_gb=0.0,
|
|
usage_percent=0.0,
|
|
status="offline" if snapshot.source_status == "offline" else "online",
|
|
)
|
|
|
|
critical_count = sum(1 for d in disks if d.status == "critical")
|
|
warning_count = sum(1 for d in disks if d.status == "warning")
|
|
overall_status: OverallStatus = (
|
|
"offline" if snapshot.source_status == "offline"
|
|
else ("degraded" if critical_count or warning_count else "online")
|
|
)
|
|
|
|
return StorageResponse(
|
|
generated_at=now,
|
|
summary=StorageSummary(
|
|
overall_status=overall_status,
|
|
source_status=storage_source_status,
|
|
critical_disks=critical_count,
|
|
warning_disks=warning_count,
|
|
total_disks=len(disks),
|
|
),
|
|
root=root,
|
|
disks=disks,
|
|
)
|
|
|
|
async def _build_services(self) -> ServicesResponse:
|
|
docker_snap, uk_snap = await asyncio.gather(
|
|
self.docker_client.fetch_containers(),
|
|
self.uptime_kuma_client.fetch_monitors(),
|
|
)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
monitor_by_name = {
|
|
self._normalize_identifier(m.name): m for m in uk_snap.monitors
|
|
}
|
|
docker_by_name = {
|
|
self._normalize_identifier(c.name): c for c in docker_snap.containers
|
|
}
|
|
|
|
items: list[ServiceItem] = []
|
|
merged_names = sorted(set(docker_by_name) | set(monitor_by_name))
|
|
for norm in merged_names:
|
|
container = docker_by_name.get(norm)
|
|
monitor = monitor_by_name.get(norm)
|
|
status = self._resolve_overall_status(
|
|
container.state if container else "unknown", monitor
|
|
)
|
|
items.append(ServiceItem(
|
|
id=norm,
|
|
name=monitor.name if monitor else container.name,
|
|
kind="service",
|
|
status=status,
|
|
health=self._status_to_health(status),
|
|
latency_ms=monitor.latency_ms if monitor else None,
|
|
docker_state=container.state if container else "unknown",
|
|
url=None,
|
|
source="uptime_kuma" if monitor else "docker",
|
|
last_checked=now.isoformat(),
|
|
))
|
|
|
|
statuses = [i.status for i in items]
|
|
overall = self._aggregate_statuses(statuses)
|
|
|
|
return ServicesResponse(
|
|
generated_at=now,
|
|
summary=ServicesSummary(
|
|
overall_status=overall,
|
|
docker=ServicesDockerSummary(
|
|
running=docker_snap.running,
|
|
stopped=docker_snap.stopped,
|
|
unhealthy=docker_snap.unhealthy,
|
|
total=docker_snap.total,
|
|
source_status=docker_snap.source_status,
|
|
),
|
|
uptime_kuma=ServicesUptimeKumaSummary(
|
|
monitors_up=uk_snap.monitors_up,
|
|
monitors_down=uk_snap.monitors_down,
|
|
monitors_paused=uk_snap.monitors_paused,
|
|
total=uk_snap.total,
|
|
source_status=uk_snap.source_status,
|
|
),
|
|
),
|
|
services=items,
|
|
)
|
|
|
|
async def _build_overview(self) -> OverviewResponse:
|
|
system_snap, docker_snap, uk_snap, ha_snap = await asyncio.gather(
|
|
self.beszel_client.fetch_system_snapshot(),
|
|
self.docker_client.fetch_containers(),
|
|
self.uptime_kuma_client.fetch_monitors(),
|
|
self.home_assistant_client.fetch_status(),
|
|
)
|
|
now = datetime.now(timezone.utc)
|
|
|
|
statuses: list[OverallStatus] = []
|
|
for container in docker_snap.containers:
|
|
name_lower = self._normalize_identifier(container.name)
|
|
monitor = next(
|
|
(m for m in uk_snap.monitors if self._normalize_identifier(m.name) == name_lower),
|
|
None,
|
|
)
|
|
statuses.append(self._resolve_overall_status(container.state, monitor))
|
|
|
|
overall = self._aggregate_statuses(statuses)
|
|
|
|
return OverviewResponse(
|
|
generated_at=now,
|
|
overall_status=overall,
|
|
refresh_hint_seconds=self.settings.cache_ttl_overview_seconds,
|
|
services=OverviewServicesSummary(
|
|
online=sum(1 for s in statuses if s == "online"),
|
|
degraded=sum(1 for s in statuses if s == "degraded"),
|
|
offline=sum(1 for s in statuses if s == "offline"),
|
|
total=len(statuses),
|
|
),
|
|
docker=OverviewDockerSummary(
|
|
running=docker_snap.running,
|
|
stopped=docker_snap.stopped,
|
|
unhealthy=docker_snap.unhealthy,
|
|
total=docker_snap.total,
|
|
source_status=docker_snap.source_status,
|
|
),
|
|
system=OverviewSystemSummary(
|
|
cpu_percent=system_snap.cpu_usage_percent,
|
|
ram_percent=system_snap.memory_usage_percent,
|
|
root_storage_percent=system_snap.disks[0].usage_percent if system_snap.disks else 0.0,
|
|
network_rx_mbps=system_snap.network_rx_mbps,
|
|
network_tx_mbps=system_snap.network_tx_mbps,
|
|
uptime_seconds=system_snap.uptime_seconds,
|
|
),
|
|
home_assistant=OverviewHomeAssistantSummary(
|
|
status=ha_snap.status,
|
|
label=ha_snap.label,
|
|
version=ha_snap.version,
|
|
response_time_ms=ha_snap.response_time_ms,
|
|
last_checked=ha_snap.last_checked.isoformat() if ha_snap.last_checked else None,
|
|
),
|
|
)
|
|
|
|
@staticmethod
|
|
def _normalize_identifier(value: str) -> str:
|
|
return "".join(ch.lower() for ch in value if ch.isalnum())
|
|
|
|
@staticmethod
|
|
def _resolve_overall_status(
|
|
docker_state: str,
|
|
monitor: UptimeKumaMonitor | None,
|
|
) -> OverallStatus:
|
|
if monitor:
|
|
if monitor.status == "offline":
|
|
return "offline"
|
|
if monitor.status == "degraded":
|
|
return "degraded"
|
|
if docker_state == "unhealthy":
|
|
return "degraded"
|
|
if docker_state == "stopped":
|
|
return "offline"
|
|
if docker_state == "running":
|
|
return "online"
|
|
return "offline" if monitor is None else monitor.status
|
|
|
|
@staticmethod
|
|
def _status_to_health(status: OverallStatus) -> HealthStatus:
|
|
if status == "online":
|
|
return "healthy"
|
|
if status == "degraded":
|
|
return "warning"
|
|
return "offline"
|
|
|
|
def _map_disk(self, disk: BeszelDiskMetric, source_status: str) -> StorageDisk:
|
|
if source_status == "offline":
|
|
status: DiskStatus = "offline"
|
|
elif disk.usage_percent >= 90:
|
|
status = "critical"
|
|
elif disk.usage_percent >= 75:
|
|
status = "warning"
|
|
else:
|
|
status = "online"
|
|
|
|
return StorageDisk(
|
|
name=disk.name,
|
|
mount=disk.mount,
|
|
used_gb=disk.used_gb,
|
|
total_gb=disk.total_gb,
|
|
free_gb=disk.free_gb,
|
|
usage_percent=disk.usage_percent,
|
|
status=status,
|
|
)
|
|
|
|
@staticmethod
|
|
def _aggregate_statuses(statuses: Iterable[OverallStatus]) -> OverallStatus:
|
|
normalized = list(statuses)
|
|
if not normalized:
|
|
return "offline"
|
|
if any(s == "offline" for s in normalized):
|
|
return "degraded" if any(s == "online" for s in normalized) else "offline"
|
|
if any(s in {"degraded", "warning", "critical"} for s in normalized):
|
|
return "degraded"
|
|
return "online"
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_cache_service() -> TTLCacheService:
|
|
return TTLCacheService()
|
|
|
|
|
|
@lru_cache(maxsize=1)
|
|
def get_aggregator_service() -> AggregatorService:
|
|
settings = get_settings()
|
|
cache = get_cache_service()
|
|
return AggregatorService(
|
|
settings=settings,
|
|
cache=cache,
|
|
beszel_client=BeszelClient(settings),
|
|
docker_client=DockerProxyClient(settings),
|
|
uptime_kuma_client=UptimeKumaClient(settings),
|
|
home_assistant_client=HomeAssistantClient(settings),
|
|
adguard_client=AdGuardClient(settings),
|
|
scrutiny_client=ScrutinyClient(settings),
|
|
immich_client=ImmichClient(settings),
|
|
backrest_client=BackrestClient(settings),
|
|
)
|