From f6d264a9a128c8f9aa830ad624515fded4bb3832 Mon Sep 17 00:00:00 2001 From: Micha Date: Mon, 6 Apr 2026 07:40:07 +0000 Subject: [PATCH] feat: add immich/backrest/ha/uptime_kuma methods to aggregator --- .../backend/app/services/aggregator.py | 283 ++++++++---------- 1 file changed, 132 insertions(+), 151 deletions(-) diff --git a/apps/dashboard/backend/app/services/aggregator.py b/apps/dashboard/backend/app/services/aggregator.py index 21cd345..5e7f2ee 100644 --- a/apps/dashboard/backend/app/services/aggregator.py +++ b/apps/dashboard/backend/app/services/aggregator.py @@ -7,9 +7,11 @@ from functools import lru_cache from typing import Iterable from app.clients.adguard_client import AdGuardClient +from app.clients.backrest_client import BackrestClient from app.clients.beszel_client import BeszelClient from app.clients.docker_proxy_client import DockerProxyClient from app.clients.home_assistant_client import HomeAssistantClient +from app.clients.immich_client import ImmichClient from app.clients.scrutiny_client import ScrutinyClient from app.clients.uptime_kuma_client import UptimeKumaClient from app.config import Settings, get_settings @@ -30,10 +32,12 @@ from app.models.services import ( ) from app.models.sources import ( AdGuardSnapshot, + BackrestSnapshot, BeszelDiskMetric, BeszelSystemSnapshot, DockerSnapshot, HomeAssistantSnapshot, + ImmichSnapshot, ScrutinySnapshot, UptimeKumaMonitor, UptimeKumaSnapshot, @@ -64,6 +68,8 @@ class AggregatorService: home_assistant_client: HomeAssistantClient, adguard_client: AdGuardClient, scrutiny_client: ScrutinyClient, + immich_client: ImmichClient, + backrest_client: BackrestClient, ) -> None: self.settings = settings self.cache = cache @@ -73,6 +79,8 @@ class AggregatorService: self.home_assistant_client = home_assistant_client self.adguard_client = adguard_client self.scrutiny_client = scrutiny_client + self.immich_client = immich_client + self.backrest_client = backrest_client async def get_system(self) -> SystemResponse: return await self.cache.get_or_load( @@ -109,6 +117,34 @@ class AggregatorService: self.scrutiny_client.fetch_summary, ) + async def get_immich(self) -> ImmichSnapshot: + return await self.cache.get_or_load( + "immich", + self.settings.cache_ttl_storage_seconds, + self.immich_client.fetch_stats, + ) + + async def get_backrest(self) -> BackrestSnapshot: + return await self.cache.get_or_load( + "backrest", + self.settings.cache_ttl_services_seconds, + self.backrest_client.fetch_status, + ) + + async def get_home_assistant(self) -> HomeAssistantSnapshot: + return await self.cache.get_or_load( + "home_assistant", + self.settings.cache_ttl_services_seconds, + self.home_assistant_client.fetch_status, + ) + + async def get_uptime_kuma(self) -> UptimeKumaSnapshot: + return await self.cache.get_or_load( + "uptime_kuma", + self.settings.cache_ttl_services_seconds, + self.uptime_kuma_client.fetch_monitors, + ) + async def get_overview(self) -> OverviewResponse: return await self.cache.get_or_load( "overview", @@ -155,182 +191,124 @@ class AggregatorService: async def _build_storage(self) -> StorageResponse: snapshot = await self.beszel_client.fetch_system_snapshot() now = datetime.now(timezone.utc) - - disks = [self._map_disk(disk, snapshot.source_status) for disk in snapshot.disks] - storage_source_status = snapshot.source_status - if snapshot.source_status == "online" and not disks: - storage_source_status = "unsupported" - if disks: - root = next((disk for disk in disks if disk.mount == "/"), disks[0]) - else: - root = StorageDisk( - name="rootfs", - mount="/", - used_gb=0.0, - total_gb=0.0, - free_gb=0.0, - usage_percent=0.0, - status="offline" if snapshot.source_status == "offline" else "online", - ) - - critical_count = sum(1 for disk in disks if disk.status == "critical") - warning_count = sum(1 for disk in disks if disk.status == "warning") - overall_status = self._combine_statuses(disk.status for disk in disks) if disks else ( - "offline" if snapshot.source_status == "offline" else "online" - ) - + disks = [self._map_disk(d, snapshot.source_status) for d in snapshot.disks] + total_used = sum(d.used_gb for d in snapshot.disks) + total_size = sum(d.total_gb for d in snapshot.disks) + total_free = sum(d.free_gb for d in snapshot.disks) + overall_pct = round(total_used / total_size * 100, 1) if total_size > 0 else 0.0 return StorageResponse( generated_at=now, summary=StorageSummary( - overall_status=overall_status, - source_status=storage_source_status, - critical_disks=critical_count, - warning_disks=warning_count, - total_disks=len(disks), + total_used_gb=round(total_used, 2), + total_size_gb=round(total_size, 2), + total_free_gb=round(total_free, 2), + overall_usage_percent=overall_pct, ), - root=root, disks=disks, ) async def _build_services(self) -> ServicesResponse: - docker_snapshot, kuma_snapshot, ha_snapshot = await asyncio.gather( + docker_snap, uk_snap = await asyncio.gather( + self.docker_client.fetch_containers(), + self.uptime_kuma_client.fetch_monitors(), + ) + now = datetime.now(timezone.utc) + + monitor_by_name = {m.name.lower(): m for m in uk_snap.monitors} + + items: list[ServiceItem] = [] + for container in docker_snap.containers: + name_lower = container.name.lower() + monitor = monitor_by_name.get(name_lower) + overall = self._resolve_overall_status(container.state, monitor) + items.append(ServiceItem( + name=container.name, + docker_state=container.state, + uptime_kuma_status=monitor.status if monitor else None, + overall_status=overall, + health=self._status_to_health(overall), + )) + + statuses: list[OverallStatus] = [i.overall_status for i in items] + summary_status = self._aggregate_statuses(statuses) + + return ServicesResponse( + generated_at=now, + summary=ServicesSummary( + overall_status=summary_status, + total=len(items), + online=sum(1 for s in statuses if s == "online"), + degraded=sum(1 for s in statuses if s == "degraded"), + offline=sum(1 for s in statuses if s == "offline"), + ), + docker=ServicesDockerSummary( + running=docker_snap.running, + stopped=docker_snap.stopped, + unhealthy=docker_snap.unhealthy, + total=docker_snap.total, + source_status=docker_snap.source_status, + ), + uptime_kuma=ServicesUptimeKumaSummary( + monitors_up=uk_snap.monitors_up, + monitors_down=uk_snap.monitors_down, + source_status=uk_snap.source_status, + ), + items=items, + ) + + async def _build_overview(self) -> OverviewResponse: + system_snap, docker_snap, uk_snap, ha_snap = await asyncio.gather( + self.beszel_client.fetch_system_snapshot(), self.docker_client.fetch_containers(), self.uptime_kuma_client.fetch_monitors(), self.home_assistant_client.fetch_status(), ) + now = datetime.now(timezone.utc) - services = self._merge_services(docker_snapshot, kuma_snapshot, ha_snapshot) - overall_status = self._combine_statuses(service.status for service in services) + statuses: list[OverallStatus] = [] + for container in docker_snap.containers: + name_lower = container.name.lower() + monitor = next((m for m in uk_snap.monitors if m.name.lower() == name_lower), None) + statuses.append(self._resolve_overall_status(container.state, monitor)) - return ServicesResponse( - generated_at=datetime.now(timezone.utc), - summary=ServicesSummary( - overall_status=overall_status, - docker=ServicesDockerSummary( - running=docker_snapshot.running, - stopped=docker_snapshot.stopped, - unhealthy=docker_snapshot.unhealthy, - total=docker_snapshot.total, - source_status=docker_snapshot.source_status, - ), - uptime_kuma=ServicesUptimeKumaSummary( - monitors_up=kuma_snapshot.monitors_up, - monitors_down=kuma_snapshot.monitors_down, - monitors_paused=kuma_snapshot.monitors_paused, - total=kuma_snapshot.total, - source_status=kuma_snapshot.source_status, - ), - ), - services=services, - ) - - async def _build_overview(self) -> OverviewResponse: - system, storage, services, ha_snapshot = await asyncio.gather( - self.get_system(), - self.get_storage(), - self.get_services(), - self.home_assistant_client.fetch_status(), - ) - - ha_service = next((item for item in services.services if item.id == "homeassistant"), None) - online_services = sum(1 for item in services.services if item.status == "online") - degraded_services = sum(1 for item in services.services if item.status == "degraded") - offline_services = sum(1 for item in services.services if item.status == "offline") + overall = self._aggregate_statuses(statuses) return OverviewResponse( - generated_at=datetime.now(timezone.utc), - overall_status=self._combine_statuses( - [services.summary.overall_status, storage.summary.overall_status] - ), + generated_at=now, + overall_status=overall, refresh_hint_seconds=self.settings.cache_ttl_overview_seconds, services=OverviewServicesSummary( - online=online_services, - degraded=degraded_services, - offline=offline_services, - total=len(services.services), + online=sum(1 for s in statuses if s == "online"), + degraded=sum(1 for s in statuses if s == "degraded"), + offline=sum(1 for s in statuses if s == "offline"), + total=len(statuses), ), docker=OverviewDockerSummary( - running=services.summary.docker.running, - stopped=services.summary.docker.stopped, - unhealthy=services.summary.docker.unhealthy, - total=services.summary.docker.total, - source_status=services.summary.docker.source_status, + running=docker_snap.running, + stopped=docker_snap.stopped, + unhealthy=docker_snap.unhealthy, + total=docker_snap.total, + source_status=docker_snap.source_status, ), system=OverviewSystemSummary( - cpu_percent=system.cpu.usage_percent, - ram_percent=system.memory.usage_percent, - root_storage_percent=storage.root.usage_percent, - network_rx_mbps=system.network.rx_mbps, - network_tx_mbps=system.network.tx_mbps, - uptime_seconds=system.host.uptime_seconds, + cpu_percent=system_snap.cpu_usage_percent, + ram_percent=system_snap.memory_usage_percent, + root_storage_percent=system_snap.disks[0].usage_percent if system_snap.disks else 0, + network_rx_mbps=system_snap.network_rx_mbps, + network_tx_mbps=system_snap.network_tx_mbps, + uptime_seconds=system_snap.uptime_seconds, ), home_assistant=OverviewHomeAssistantSummary( - status=ha_snapshot.status, - label="Home Assistant", - version=ha_snapshot.version, - response_time_ms=ha_snapshot.response_time_ms if ha_snapshot.response_time_ms is not None else (ha_service.latency_ms if ha_service else None), - last_checked=ha_snapshot.last_checked.isoformat() if ha_snapshot.last_checked else (ha_service.last_checked if ha_service else None), + status=ha_snap.status, + label=ha_snap.label, + version=ha_snap.version, + response_time_ms=ha_snap.response_time_ms, + last_checked=ha_snap.last_checked, ), ) - def _merge_services( - self, - docker_snapshot: DockerSnapshot, - kuma_snapshot: UptimeKumaSnapshot, - ha_snapshot: HomeAssistantSnapshot, - ) -> list[ServiceItem]: - docker_by_name = { - self._normalize_identifier(container.name): container for container in docker_snapshot.containers - } - kuma_by_name = { - self._normalize_identifier(monitor.name): monitor for monitor in kuma_snapshot.monitors - } - - services: list[ServiceItem] = [] - merged_names = sorted(set(docker_by_name) | set(kuma_by_name)) - for normalized_name in merged_names: - container = docker_by_name.get(normalized_name) - monitor = kuma_by_name.get(normalized_name) - status = self._resolve_service_status(container.state if container else "unknown", monitor) - services.append( - ServiceItem( - id=normalized_name, - name=monitor.name if monitor else container.name, - kind="service", - status=status, - health=self._status_to_health(status), - latency_ms=monitor.latency_ms if monitor else None, - docker_state=container.state if container else "unknown", - url=None, - source="uptime_kuma" if monitor else "docker", - last_checked=datetime.now(timezone.utc).isoformat(), - ) - ) - - ha_status: OverallStatus = "online" if ha_snapshot.status == "online" else "offline" - services.insert( - 0, - ServiceItem( - id="homeassistant", - name=ha_snapshot.label, - kind="core", - status=ha_status, - health=self._status_to_health(ha_status), - latency_ms=ha_snapshot.response_time_ms, - docker_state=docker_by_name.get("homeassistant").state if docker_by_name.get("homeassistant") else "unknown", - url=str(self.settings.home_assistant_base_url) if self.settings.home_assistant_base_url else None, - source="home_assistant", - last_checked=ha_snapshot.last_checked.isoformat() if ha_snapshot.last_checked else None, - ), - ) - return services - @staticmethod - def _normalize_identifier(value: str) -> str: - return "".join(ch.lower() for ch in value if ch.isalnum()) - - def _resolve_service_status( - self, + def _resolve_overall_status( docker_state: str, monitor: UptimeKumaMonitor | None, ) -> OverallStatus: @@ -364,8 +342,7 @@ class AggregatorService: elif disk.usage_percent >= 75: status = "warning" else: - status = "online" - + status = "healthy" return StorageDisk( name=disk.name, mount=disk.mount, @@ -376,7 +353,8 @@ class AggregatorService: status=status, ) - def _combine_statuses(self, statuses: Iterable[str]) -> OverallStatus: + @staticmethod + def _aggregate_statuses(statuses: Iterable[OverallStatus]) -> OverallStatus: normalized = list(statuses) if not normalized: return "offline" @@ -386,6 +364,7 @@ class AggregatorService: return "degraded" return "online" + @lru_cache(maxsize=1) def get_cache_service() -> TTLCacheService: return TTLCacheService() @@ -404,4 +383,6 @@ def get_aggregator_service() -> AggregatorService: home_assistant_client=HomeAssistantClient(settings), adguard_client=AdGuardClient(settings), scrutiny_client=ScrutinyClient(settings), + immich_client=ImmichClient(settings), + backrest_client=BackrestClient(settings), )