from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from functools import lru_cache from typing import Iterable from app.clients.adguard_client import AdGuardClient from app.clients.beszel_client import BeszelClient from app.clients.docker_proxy_client import DockerProxyClient from app.clients.home_assistant_client import HomeAssistantClient from app.clients.scrutiny_client import ScrutinyClient from app.clients.uptime_kuma_client import UptimeKumaClient from app.config import Settings, get_settings from app.models.common import DiskStatus, HealthStatus, OverallStatus from app.models.overview import ( OverviewDockerSummary, OverviewHomeAssistantSummary, OverviewResponse, OverviewServicesSummary, OverviewSystemSummary, ) from app.models.services import ( ServiceItem, ServicesDockerSummary, ServicesResponse, ServicesSummary, ServicesUptimeKumaSummary, ) from app.models.sources import ( AdGuardSnapshot, BeszelDiskMetric, BeszelSystemSnapshot, DockerSnapshot, HomeAssistantSnapshot, ScrutinySnapshot, UptimeKumaMonitor, UptimeKumaSnapshot, ) from app.models.storage import StorageDisk, StorageResponse, StorageSummary from app.models.system import ( SystemCPU, SystemHost, SystemMemory, SystemNetwork, SystemResponse, SystemSource, ) from app.services.cache import TTLCacheService logger = logging.getLogger(__name__) class AggregatorService: def __init__( self, settings: Settings, cache: TTLCacheService, beszel_client: BeszelClient, docker_client: DockerProxyClient, uptime_kuma_client: UptimeKumaClient, home_assistant_client: HomeAssistantClient, adguard_client: AdGuardClient, scrutiny_client: ScrutinyClient, ) -> None: self.settings = settings self.cache = cache self.beszel_client = beszel_client self.docker_client = docker_client self.uptime_kuma_client = uptime_kuma_client self.home_assistant_client = home_assistant_client self.adguard_client = adguard_client self.scrutiny_client = scrutiny_client async def get_system(self) -> SystemResponse: return await self.cache.get_or_load( "system", self.settings.cache_ttl_system_seconds, self._build_system, ) async def get_storage(self) -> StorageResponse: return await self.cache.get_or_load( "storage", self.settings.cache_ttl_storage_seconds, self._build_storage, ) async def get_services(self) -> ServicesResponse: return await self.cache.get_or_load( "services", self.settings.cache_ttl_services_seconds, self._build_services, ) async def get_adguard(self) -> AdGuardSnapshot: return await self.cache.get_or_load( "adguard", self.settings.cache_ttl_services_seconds, self.adguard_client.fetch_stats, ) async def get_scrutiny(self) -> ScrutinySnapshot: return await self.cache.get_or_load( "scrutiny", self.settings.cache_ttl_storage_seconds, self.scrutiny_client.fetch_summary, ) async def get_overview(self) -> OverviewResponse: return await self.cache.get_or_load( "overview", self.settings.cache_ttl_overview_seconds, self._build_overview, ) async def _build_system(self) -> SystemResponse: snapshot = await self.beszel_client.fetch_system_snapshot() now = datetime.now(timezone.utc) return SystemResponse( generated_at=now, source=SystemSource( name=snapshot.source_name, status=snapshot.source_status, host_name=snapshot.host_name, agent_name=snapshot.agent_name, ), cpu=SystemCPU( usage_percent=snapshot.cpu_usage_percent, cores=snapshot.cpu_cores, load_1=snapshot.load_1, load_5=snapshot.load_5, load_15=snapshot.load_15, ), memory=SystemMemory( used_gb=snapshot.memory_used_gb, total_gb=snapshot.memory_total_gb, available_gb=snapshot.memory_available_gb, usage_percent=snapshot.memory_usage_percent, ), network=SystemNetwork( primary_interface=snapshot.primary_interface, rx_mbps=snapshot.network_rx_mbps, tx_mbps=snapshot.network_tx_mbps, ), host=SystemHost( uptime_seconds=snapshot.uptime_seconds, platform=snapshot.platform, kernel=snapshot.kernel, ), ) async def _build_storage(self) -> StorageResponse: snapshot = await self.beszel_client.fetch_system_snapshot() now = datetime.now(timezone.utc) disks = [self._map_disk(disk, snapshot.source_status) for disk in snapshot.disks] storage_source_status = snapshot.source_status if snapshot.source_status == "online" and not disks: storage_source_status = "unsupported" if disks: root = next((disk for disk in disks if disk.mount == "/"), disks[0]) else: root = StorageDisk( name="rootfs", mount="/", used_gb=0.0, total_gb=0.0, free_gb=0.0, usage_percent=0.0, status="offline" if snapshot.source_status == "offline" else "online", ) critical_count = sum(1 for disk in disks if disk.status == "critical") warning_count = sum(1 for disk in disks if disk.status == "warning") overall_status = self._combine_statuses(disk.status for disk in disks) if disks else ( "offline" if snapshot.source_status == "offline" else "online" ) return StorageResponse( generated_at=now, summary=StorageSummary( overall_status=overall_status, source_status=storage_source_status, critical_disks=critical_count, warning_disks=warning_count, total_disks=len(disks), ), root=root, disks=disks, ) async def _build_services(self) -> ServicesResponse: docker_snapshot, kuma_snapshot, ha_snapshot = await asyncio.gather( self.docker_client.fetch_containers(), self.uptime_kuma_client.fetch_monitors(), self.home_assistant_client.fetch_status(), ) services = self._merge_services(docker_snapshot, kuma_snapshot, ha_snapshot) overall_status = self._combine_statuses(service.status for service in services) return ServicesResponse( generated_at=datetime.now(timezone.utc), summary=ServicesSummary( overall_status=overall_status, docker=ServicesDockerSummary( running=docker_snapshot.running, stopped=docker_snapshot.stopped, unhealthy=docker_snapshot.unhealthy, total=docker_snapshot.total, source_status=docker_snapshot.source_status, ), uptime_kuma=ServicesUptimeKumaSummary( monitors_up=kuma_snapshot.monitors_up, monitors_down=kuma_snapshot.monitors_down, monitors_paused=kuma_snapshot.monitors_paused, total=kuma_snapshot.total, source_status=kuma_snapshot.source_status, ), ), services=services, ) async def _build_overview(self) -> OverviewResponse: system, storage, services, ha_snapshot = await asyncio.gather( self.get_system(), self.get_storage(), self.get_services(), self.home_assistant_client.fetch_status(), ) ha_service = next((item for item in services.services if item.id == "homeassistant"), None) online_services = sum(1 for item in services.services if item.status == "online") degraded_services = sum(1 for item in services.services if item.status == "degraded") offline_services = sum(1 for item in services.services if item.status == "offline") return OverviewResponse( generated_at=datetime.now(timezone.utc), overall_status=self._combine_statuses( [services.summary.overall_status, storage.summary.overall_status] ), refresh_hint_seconds=self.settings.cache_ttl_overview_seconds, services=OverviewServicesSummary( online=online_services, degraded=degraded_services, offline=offline_services, total=len(services.services), ), docker=OverviewDockerSummary( running=services.summary.docker.running, stopped=services.summary.docker.stopped, unhealthy=services.summary.docker.unhealthy, total=services.summary.docker.total, source_status=services.summary.docker.source_status, ), system=OverviewSystemSummary( cpu_percent=system.cpu.usage_percent, ram_percent=system.memory.usage_percent, root_storage_percent=storage.root.usage_percent, network_rx_mbps=system.network.rx_mbps, network_tx_mbps=system.network.tx_mbps, uptime_seconds=system.host.uptime_seconds, ), home_assistant=OverviewHomeAssistantSummary( status=ha_snapshot.status, label="Home Assistant", version=ha_snapshot.version, response_time_ms=ha_snapshot.response_time_ms if ha_snapshot.response_time_ms is not None else (ha_service.latency_ms if ha_service else None), last_checked=ha_snapshot.last_checked.isoformat() if ha_snapshot.last_checked else (ha_service.last_checked if ha_service else None), ), ) def _merge_services( self, docker_snapshot: DockerSnapshot, kuma_snapshot: UptimeKumaSnapshot, ha_snapshot: HomeAssistantSnapshot, ) -> list[ServiceItem]: docker_by_name = { self._normalize_identifier(container.name): container for container in docker_snapshot.containers } kuma_by_name = { self._normalize_identifier(monitor.name): monitor for monitor in kuma_snapshot.monitors } services: list[ServiceItem] = [] merged_names = sorted(set(docker_by_name) | set(kuma_by_name)) for normalized_name in merged_names: container = docker_by_name.get(normalized_name) monitor = kuma_by_name.get(normalized_name) status = self._resolve_service_status(container.state if container else "unknown", monitor) services.append( ServiceItem( id=normalized_name, name=monitor.name if monitor else container.name, kind="service", status=status, health=self._status_to_health(status), latency_ms=monitor.latency_ms if monitor else None, docker_state=container.state if container else "unknown", url=None, source="uptime_kuma" if monitor else "docker", last_checked=datetime.now(timezone.utc).isoformat(), ) ) ha_status: OverallStatus = "online" if ha_snapshot.status == "online" else "offline" services.insert( 0, ServiceItem( id="homeassistant", name=ha_snapshot.label, kind="core", status=ha_status, health=self._status_to_health(ha_status), latency_ms=ha_snapshot.response_time_ms, docker_state=docker_by_name.get("homeassistant").state if docker_by_name.get("homeassistant") else "unknown", url=str(self.settings.home_assistant_base_url) if self.settings.home_assistant_base_url else None, source="home_assistant", last_checked=ha_snapshot.last_checked.isoformat() if ha_snapshot.last_checked else None, ), ) return services @staticmethod def _normalize_identifier(value: str) -> str: return "".join(ch.lower() for ch in value if ch.isalnum()) def _resolve_service_status( self, docker_state: str, monitor: UptimeKumaMonitor | None, ) -> OverallStatus: if monitor: if monitor.status == "offline": return "offline" if monitor.status == "degraded": return "degraded" if docker_state == "unhealthy": return "degraded" if docker_state == "stopped": return "offline" if docker_state == "running": return "online" return "offline" if monitor is None else monitor.status @staticmethod def _status_to_health(status: OverallStatus) -> HealthStatus: if status == "online": return "healthy" if status == "degraded": return "warning" return "offline" def _map_disk(self, disk: BeszelDiskMetric, source_status: str) -> StorageDisk: if source_status == "offline": status: DiskStatus = "offline" elif disk.usage_percent >= 90: status = "critical" elif disk.usage_percent >= 75: status = "warning" else: status = "online" return StorageDisk( name=disk.name, mount=disk.mount, used_gb=disk.used_gb, total_gb=disk.total_gb, free_gb=disk.free_gb, usage_percent=disk.usage_percent, status=status, ) def _combine_statuses(self, statuses: Iterable[str]) -> OverallStatus: normalized = list(statuses) if not normalized: return "offline" if any(status == "offline" for status in normalized): return "degraded" if any(status == "online" for status in normalized) else "offline" if any(status in {"degraded", "warning", "critical"} for status in normalized): return "degraded" return "online" @lru_cache(maxsize=1) def get_cache_service() -> TTLCacheService: return TTLCacheService() @lru_cache(maxsize=1) def get_aggregator_service() -> AggregatorService: settings = get_settings() cache = get_cache_service() return AggregatorService( settings=settings, cache=cache, beszel_client=BeszelClient(settings), docker_client=DockerProxyClient(settings), uptime_kuma_client=UptimeKumaClient(settings), home_assistant_client=HomeAssistantClient(settings), adguard_client=AdGuardClient(settings), scrutiny_client=ScrutinyClient(settings), )