from __future__ import annotations import asyncio import logging from datetime import datetime, timezone from functools import lru_cache from typing import Iterable from app.clients.adguard_client import AdGuardClient from app.clients.backrest_client import BackrestClient from app.clients.beszel_client import BeszelClient from app.clients.docker_proxy_client import DockerProxyClient from app.clients.home_assistant_client import HomeAssistantClient from app.clients.immich_client import ImmichClient from app.clients.scrutiny_client import ScrutinyClient from app.clients.uptime_kuma_client import UptimeKumaClient from app.config import Settings, get_settings from app.models.common import DiskStatus, HealthStatus, OverallStatus from app.models.overview import ( OverviewDockerSummary, OverviewHomeAssistantSummary, OverviewResponse, OverviewServicesSummary, OverviewSystemSummary, ) from app.models.services import ( ServiceItem, ServicesDockerSummary, ServicesResponse, ServicesSummary, ServicesUptimeKumaSummary, ) from app.models.sources import ( AdGuardSnapshot, BackrestSnapshot, BeszelDiskMetric, BeszelSystemSnapshot, DockerSnapshot, HomeAssistantSnapshot, ImmichSnapshot, ScrutinySnapshot, UptimeKumaMonitor, UptimeKumaSnapshot, ) from app.models.storage import StorageDisk, StorageResponse, StorageSummary from app.models.system import ( SystemCPU, SystemHost, SystemMemory, SystemNetwork, SystemResponse, SystemSource, ) from app.services.cache import TTLCacheService logger = logging.getLogger(__name__) class AggregatorService: def __init__( self, settings: Settings, cache: TTLCacheService, beszel_client: BeszelClient, docker_client: DockerProxyClient, uptime_kuma_client: UptimeKumaClient, home_assistant_client: HomeAssistantClient, adguard_client: AdGuardClient, scrutiny_client: ScrutinyClient, immich_client: ImmichClient, backrest_client: BackrestClient, ) -> None: self.settings = settings self.cache = cache self.beszel_client = beszel_client self.docker_client = docker_client self.uptime_kuma_client = uptime_kuma_client self.home_assistant_client = home_assistant_client self.adguard_client = adguard_client self.scrutiny_client = scrutiny_client self.immich_client = immich_client self.backrest_client = backrest_client async def get_system(self) -> SystemResponse: return await self.cache.get_or_load( "system", self.settings.cache_ttl_system_seconds, self._build_system, ) async def get_storage(self) -> StorageResponse: return await self.cache.get_or_load( "storage", self.settings.cache_ttl_storage_seconds, self._build_storage, ) async def get_services(self) -> ServicesResponse: return await self.cache.get_or_load( "services", self.settings.cache_ttl_services_seconds, self._build_services, ) async def get_adguard(self) -> AdGuardSnapshot: return await self.cache.get_or_load( "adguard", self.settings.cache_ttl_services_seconds, self.adguard_client.fetch_stats, ) async def get_scrutiny(self) -> ScrutinySnapshot: return await self.cache.get_or_load( "scrutiny", self.settings.cache_ttl_storage_seconds, self.scrutiny_client.fetch_summary, ) async def get_immich(self) -> ImmichSnapshot: return await self.cache.get_or_load( "immich", self.settings.cache_ttl_services_seconds, self.immich_client.fetch_stats, ) async def get_backrest(self) -> BackrestSnapshot: return await self.cache.get_or_load( "backrest", self.settings.cache_ttl_services_seconds, self.backrest_client.fetch_status, ) async def get_home_assistant(self) -> HomeAssistantSnapshot: return await self.cache.get_or_load( "home_assistant", self.settings.cache_ttl_services_seconds, self.home_assistant_client.fetch_status, ) async def get_uptime_kuma(self) -> UptimeKumaSnapshot: return await self.cache.get_or_load( "uptime_kuma", self.settings.cache_ttl_services_seconds, self.uptime_kuma_client.fetch_monitors, ) async def get_overview(self) -> OverviewResponse: return await self.cache.get_or_load( "overview", self.settings.cache_ttl_overview_seconds, self._build_overview, ) async def _build_system(self) -> SystemResponse: snapshot = await self.beszel_client.fetch_system_snapshot() now = datetime.now(timezone.utc) return SystemResponse( generated_at=now, source=SystemSource( name=snapshot.source_name, status=snapshot.source_status, host_name=snapshot.host_name, agent_name=snapshot.agent_name, ), cpu=SystemCPU( usage_percent=snapshot.cpu_usage_percent, cores=snapshot.cpu_cores, load_1=snapshot.load_1, load_5=snapshot.load_5, load_15=snapshot.load_15, ), memory=SystemMemory( used_gb=snapshot.memory_used_gb, total_gb=snapshot.memory_total_gb, available_gb=snapshot.memory_available_gb, usage_percent=snapshot.memory_usage_percent, ), network=SystemNetwork( primary_interface=snapshot.primary_interface, rx_mbps=snapshot.network_rx_mbps, tx_mbps=snapshot.network_tx_mbps, ), host=SystemHost( uptime_seconds=snapshot.uptime_seconds, platform=snapshot.platform, kernel=snapshot.kernel, ), ) async def _build_storage(self) -> StorageResponse: snapshot = await self.beszel_client.fetch_system_snapshot() now = datetime.now(timezone.utc) disks = [self._map_disk(d, snapshot.source_status) for d in snapshot.disks] storage_source_status = snapshot.source_status if snapshot.source_status == "online" and not disks: storage_source_status = "unsupported" if disks: root = next((d for d in disks if d.mount == "/"), disks[0]) else: root = StorageDisk( name="rootfs", mount="/", used_gb=0.0, total_gb=0.0, free_gb=0.0, usage_percent=0.0, status="offline" if snapshot.source_status == "offline" else "online", ) critical_count = sum(1 for d in disks if d.status == "critical") warning_count = sum(1 for d in disks if d.status == "warning") overall_status: OverallStatus = ( "offline" if snapshot.source_status == "offline" else ("degraded" if critical_count or warning_count else "online") ) return StorageResponse( generated_at=now, summary=StorageSummary( overall_status=overall_status, source_status=storage_source_status, critical_disks=critical_count, warning_disks=warning_count, total_disks=len(disks), ), root=root, disks=disks, ) async def _build_services(self) -> ServicesResponse: docker_snap, uk_snap = await asyncio.gather( self.docker_client.fetch_containers(), self.uptime_kuma_client.fetch_monitors(), ) now = datetime.now(timezone.utc) monitor_by_name = { self._normalize_identifier(m.name): m for m in uk_snap.monitors } docker_by_name = { self._normalize_identifier(c.name): c for c in docker_snap.containers } items: list[ServiceItem] = [] merged_names = sorted(set(docker_by_name) | set(monitor_by_name)) for norm in merged_names: container = docker_by_name.get(norm) monitor = monitor_by_name.get(norm) status = self._resolve_overall_status( container.state if container else "unknown", monitor ) items.append(ServiceItem( id=norm, name=monitor.name if monitor else container.name, kind="service", status=status, health=self._status_to_health(status), latency_ms=monitor.latency_ms if monitor else None, docker_state=container.state if container else "unknown", url=None, source="uptime_kuma" if monitor else "docker", last_checked=now.isoformat(), )) statuses = [i.status for i in items] overall = self._aggregate_statuses(statuses) return ServicesResponse( generated_at=now, summary=ServicesSummary( overall_status=overall, docker=ServicesDockerSummary( running=docker_snap.running, stopped=docker_snap.stopped, unhealthy=docker_snap.unhealthy, total=docker_snap.total, source_status=docker_snap.source_status, ), uptime_kuma=ServicesUptimeKumaSummary( monitors_up=uk_snap.monitors_up, monitors_down=uk_snap.monitors_down, monitors_paused=uk_snap.monitors_paused, total=uk_snap.total, source_status=uk_snap.source_status, ), ), services=items, ) async def _build_overview(self) -> OverviewResponse: system_snap, docker_snap, uk_snap, ha_snap = await asyncio.gather( self.beszel_client.fetch_system_snapshot(), self.docker_client.fetch_containers(), self.uptime_kuma_client.fetch_monitors(), self.home_assistant_client.fetch_status(), ) now = datetime.now(timezone.utc) statuses: list[OverallStatus] = [] for container in docker_snap.containers: name_lower = self._normalize_identifier(container.name) monitor = next( (m for m in uk_snap.monitors if self._normalize_identifier(m.name) == name_lower), None, ) statuses.append(self._resolve_overall_status(container.state, monitor)) overall = self._aggregate_statuses(statuses) return OverviewResponse( generated_at=now, overall_status=overall, refresh_hint_seconds=self.settings.cache_ttl_overview_seconds, services=OverviewServicesSummary( online=sum(1 for s in statuses if s == "online"), degraded=sum(1 for s in statuses if s == "degraded"), offline=sum(1 for s in statuses if s == "offline"), total=len(statuses), ), docker=OverviewDockerSummary( running=docker_snap.running, stopped=docker_snap.stopped, unhealthy=docker_snap.unhealthy, total=docker_snap.total, source_status=docker_snap.source_status, ), system=OverviewSystemSummary( cpu_percent=system_snap.cpu_usage_percent, ram_percent=system_snap.memory_usage_percent, root_storage_percent=system_snap.disks[0].usage_percent if system_snap.disks else 0.0, network_rx_mbps=system_snap.network_rx_mbps, network_tx_mbps=system_snap.network_tx_mbps, uptime_seconds=system_snap.uptime_seconds, ), home_assistant=OverviewHomeAssistantSummary( status=ha_snap.status, label=ha_snap.label, version=ha_snap.version, response_time_ms=ha_snap.response_time_ms, last_checked=ha_snap.last_checked.isoformat() if ha_snap.last_checked else None, ), ) @staticmethod def _normalize_identifier(value: str) -> str: return "".join(ch.lower() for ch in value if ch.isalnum()) @staticmethod def _resolve_overall_status( docker_state: str, monitor: UptimeKumaMonitor | None, ) -> OverallStatus: if monitor: if monitor.status == "offline": return "offline" if monitor.status == "degraded": return "degraded" if docker_state == "unhealthy": return "degraded" if docker_state == "stopped": return "offline" if docker_state == "running": return "online" return "offline" if monitor is None else monitor.status @staticmethod def _status_to_health(status: OverallStatus) -> HealthStatus: if status == "online": return "healthy" if status == "degraded": return "warning" return "offline" def _map_disk(self, disk: BeszelDiskMetric, source_status: str) -> StorageDisk: if source_status == "offline": status: DiskStatus = "offline" elif disk.usage_percent >= 90: status = "critical" elif disk.usage_percent >= 75: status = "warning" else: status = "online" return StorageDisk( name=disk.name, mount=disk.mount, used_gb=disk.used_gb, total_gb=disk.total_gb, free_gb=disk.free_gb, usage_percent=disk.usage_percent, status=status, ) @staticmethod def _aggregate_statuses(statuses: Iterable[OverallStatus]) -> OverallStatus: normalized = list(statuses) if not normalized: return "offline" if any(s == "offline" for s in normalized): return "degraded" if any(s == "online" for s in normalized) else "offline" if any(s in {"degraded", "warning", "critical"} for s in normalized): return "degraded" return "online" @lru_cache(maxsize=1) def get_cache_service() -> TTLCacheService: return TTLCacheService() @lru_cache(maxsize=1) def get_aggregator_service() -> AggregatorService: settings = get_settings() cache = get_cache_service() return AggregatorService( settings=settings, cache=cache, beszel_client=BeszelClient(settings), docker_client=DockerProxyClient(settings), uptime_kuma_client=UptimeKumaClient(settings), home_assistant_client=HomeAssistantClient(settings), adguard_client=AdGuardClient(settings), scrutiny_client=ScrutinyClient(settings), immich_client=ImmichClient(settings), backrest_client=BackrestClient(settings), )