diff --git a/ops/hermes-agent/scripts/check_health.py b/ops/hermes-agent/scripts/check_health.py index d406d6b..0a9b82a 100644 --- a/ops/hermes-agent/scripts/check_health.py +++ b/ops/hermes-agent/scripts/check_health.py @@ -2,29 +2,33 @@ """ check_health.py — Homelab Alert Enricher ========================================= -Laedt services.json, prueft Docker-Health aller bekannten Abhaengigkeiten, -liest Dump-Timestamps und gibt einen strukturierten JSON-Report aus. - -Hermes liest diesen Report und baut daraus eine angereicherte ntfy-Nachricht. +Laedt services.json, prueft Services via HTTP(S) und gibt einen +strukturierten JSON-Report aus. Hermes nutzt diesen Report fuer +angereicherte ntfy-Alerts. Keine externen Abhaengigkeiten — nur Python-Standardbibliothek. +Kein Docker CLI, kein Root, kein pip. + +Check-Strategie: + - Services MIT url: HTTP GET, 2xx/3xx = healthy, Timeout/4xx/5xx = unhealthy + - Services OHNE url: "internal" — kein externer Check moeglich + - Dump-Timestamps: werden gelesen falls /mnt/user/... erreichbar ist (optional) Verwendung: - python3 check_health.py # alle unhealthy Container + python3 check_health.py # alle Services pruefen (Tier 1+2) python3 check_health.py paperless-ngx # gezielt einen Service pruefen - python3 check_health.py --summary # Gesamtstatus als Zusammenfassung + python3 check_health.py --summary # Gesamtstatus Tier 1+2 + python3 check_health.py --all # alle Tiers inkl. Tier 3 -Pfad auf der Hermes-VM (via git pull): +Pfad auf der Hermes-VM: /srv/hermes-workspace/homelab-infra/ops/hermes-agent/scripts/check_health.py - -services.json wird relativ zum Script-Verzeichnis gesucht: - ../services.json """ import json -import os -import subprocess +import ssl import sys +import urllib.request +import urllib.error from datetime import datetime from pathlib import Path @@ -34,111 +38,114 @@ from pathlib import Path SCRIPT_DIR = Path(__file__).parent.resolve() SERVICES_JSON_PATH = SCRIPT_DIR.parent / "services.json" - -# Fallback falls das Repo unter einem anderen Pfad liegt SERVICES_JSON_FALLBACK = Path("/srv/hermes-workspace/homelab-infra/ops/hermes-agent/services.json") -# Dump-Warnschwelle in Stunden (aelter = Warnung) +# HTTP-Check Timeout in Sekunden +HTTP_TIMEOUT = 8 + +# Dump-Verzeichnis (optional — wird uebersprungen wenn nicht erreichbar) +DUMP_BASE_PATHS = [ + Path("/mnt/user/backups/borg/dumps/latest"), # Unraid direkt + Path("/opt/dumps"), # gemounteter Fallback +] + +# Dump-Warnschwelle DUMP_WARN_HOURS = 26 +# SSL-Verification (True = strikt, False = ignoriert selbstsignierte Zerts) +SSL_VERIFY = False + # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def load_services(): - """Laedt services.json. Gibt (services_dict, meta_dict) zurueck. - Keine externen Abhaengigkeiten — verwendet nur json aus der Standardbibliothek.""" + """Laedt services.json ohne externe Abhaengigkeiten.""" path = SERVICES_JSON_PATH if SERVICES_JSON_PATH.exists() else SERVICES_JSON_FALLBACK if not path.exists(): raise FileNotFoundError( f"services.json nicht gefunden: {path}\n" - f"Bitte 'git pull' in /srv/hermes-workspace/homelab-infra/ ausfuehren." + "Bitte 'git pull' in /srv/hermes-workspace/homelab-infra/ ausfuehren." ) - with open(path, encoding="utf-8") as f: data = json.load(f) - return data.get("services", {}), data.get("meta", {}) -def docker_inspect(container_name: str) -> dict: +def http_check(url: str) -> dict: """ - Gibt {'status': str, 'health': str} zurueck. - status: running | exited | restarting | dead | not_found | error - health: healthy | unhealthy | starting | none | unknown + Fuehrt einen HTTP GET gegen url aus. + Gibt {'reachable': bool, 'status_code': int|None, 'error': str|None} zurueck. + 2xx und 3xx gelten als healthy. 401/403 auch (Service laeuft, Auth blockiert). """ - try: - result = subprocess.run( - [ - "docker", "inspect", - "--format", - "{{.State.Status}}|||{{if .State.Health}}{{.State.Health.Status}}{{else}}none{{end}}", - container_name, - ], - capture_output=True, - text=True, - timeout=10, - ) - if result.returncode != 0: - return {"status": "not_found", "health": "unknown"} + ctx = ssl.create_default_context() + if not SSL_VERIFY: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE - parts = result.stdout.strip().split("|||") - return { - "status": parts[0].strip() if parts else "unknown", - "health": parts[1].strip() if len(parts) > 1 else "none", - } + try: + req = urllib.request.Request(url, method="GET", headers={"User-Agent": "hermes-healthcheck/1.0"}) + with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT, context=ctx) as resp: + code = resp.status + healthy = code < 500 + return {"reachable": healthy, "status_code": code, "error": None} + except urllib.error.HTTPError as e: + # 4xx = Service laeuft, aber Auth oder Not Found — trotzdem erreichbar + healthy = e.code < 500 + return {"reachable": healthy, "status_code": e.code, "error": None} + except urllib.error.URLError as e: + return {"reachable": False, "status_code": None, "error": str(e.reason)} except Exception as e: - return {"status": "error", "health": str(e)} + return {"reachable": False, "status_code": None, "error": str(e)} -def is_healthy(inspect_result: dict) -> bool: - status = inspect_result.get("status", "") - health = inspect_result.get("health", "") - if status != "running": - return False - if health in ("unhealthy",): - return False - return True +def check_service(service: dict) -> dict: + """ + Prueft einen einzelnen Service. + Gibt {'healthy': bool, 'method': str, 'detail': dict} zurueck. + """ + url = service.get("url") + + if url: + result = http_check(url) + return { + "healthy": result["reachable"], + "method": "http", + "url": url, + "status_code": result["status_code"], + "error": result["error"], + } + else: + return { + "healthy": None, # None = unbekannt (intern, kein externer Check) + "method": "internal", + "url": None, + "status_code": None, + "error": "Kein externer Check — interner Service ohne URL", + } -def get_unhealthy_containers() -> list[str]: - """Gibt Liste aller Container zurueck die unhealthy oder nicht running sind.""" - try: - # unhealthy per healthcheck - r1 = subprocess.run( - ["docker", "ps", "--filter", "health=unhealthy", "--format", "{{.Names}}"], - capture_output=True, text=True, timeout=10, - ) - # exited/dead Container die eigentlich laufen sollten - r2 = subprocess.run( - ["docker", "ps", "--filter", "status=exited", "--format", "{{.Names}}"], - capture_output=True, text=True, timeout=10, - ) - names = set() - for raw in (r1.stdout, r2.stdout): - for name in raw.strip().split("\n"): - name = name.strip() - if name: - names.add(name) - return sorted(names) - except Exception: - return [] +def find_dump_base() -> Path | None: + """Sucht das Dump-Verzeichnis in bekannten Pfaden.""" + for p in DUMP_BASE_PATHS: + if p.exists(): + return p + return None -def get_dump_info(dump_file: str | None, dump_base: str) -> dict | None: - """Gibt Alter und Groesse des Dump-Files zurueck (oder None wenn nicht vorhanden).""" - if not dump_file: +def get_dump_info(dump_file: str | None, dump_base: Path | None) -> dict | None: + """Liest Alter und Groesse einer Dump-Datei.""" + if not dump_file or not dump_base: return None - path = Path(dump_base) / dump_file + path = dump_base / dump_file if not path.exists(): - return {"file": dump_file, "exists": False, "age_hours": None, "size_mb": None} + return {"file": dump_file, "exists": False, "age_hours": None, "size_mb": None, "warn": False} stat = path.stat() age_hours = round((datetime.now().timestamp() - stat.st_mtime) / 3600, 1) size_mb = round(stat.st_size / 1_048_576, 1) - return { "file": dump_file, "exists": True, @@ -152,30 +159,28 @@ def get_dump_info(dump_file: str | None, dump_base: str) -> dict | None: # Report-Generierung # --------------------------------------------------------------------------- -def build_service_report(service_key: str, service: dict, all_services: dict, meta: dict) -> dict: - """Erstellt einen vollstaendigen Report fuer einen einzelnen Service.""" - dump_base = meta.get("dump_base", "/mnt/user/backups/borg/dumps/latest") +def build_service_report(service_key: str, service: dict, all_services: dict) -> dict: + """Vollstaendiger Report fuer einen einzelnen Service inkl. Abhaengigkeiten.""" + dump_base = find_dump_base() - # Eigener Container-Status - own_inspect = docker_inspect(service["container_name"]) - own_healthy = is_healthy(own_inspect) + # Eigener Status + own = check_service(service) - # Abhaengigkeits-Check + # Abhaengigkeiten pruefen dep_results = {} for dep_key in service.get("dependencies", []): dep = all_services.get(dep_key) if not dep: - dep_results[dep_key] = {"status": "unknown_service", "health": "unknown", "healthy": False} + dep_results[dep_key] = {"healthy": None, "method": "unknown", "error": "Nicht in services.json"} continue - insp = docker_inspect(dep["container_name"]) dep_results[dep_key] = { - **insp, - "healthy": is_healthy(insp), "tier": dep.get("tier"), - "container_name": dep["container_name"], + "container_name": dep.get("container_name"), + **check_service(dep), } - unhealthy_deps = [k for k, v in dep_results.items() if not v["healthy"]] + # Unhealthy Deps: nur die die definitiv False sind (None = intern = ignorieren) + unhealthy_deps = [k for k, v in dep_results.items() if v.get("healthy") is False] # Dump-Info dump_info = get_dump_info(service.get("dump_file"), dump_base) @@ -185,11 +190,7 @@ def build_service_report(service_key: str, service: dict, all_services: dict, me "description": service.get("description", ""), "tier": service.get("tier"), "url": service.get("url"), - "container": { - "name": service["container_name"], - **own_inspect, - "healthy": own_healthy, - }, + "status": own, "dependencies": dep_results, "unhealthy_deps": unhealthy_deps, "dump": dump_info, @@ -199,29 +200,38 @@ def build_service_report(service_key: str, service: dict, all_services: dict, me } -def build_summary_report(all_services: dict, meta: dict) -> dict: - """Prueft alle Tier-1 und Tier-2 Dienste und gibt einen Gesamtstatus zurueck.""" - results = {} +def build_summary_report(all_services: dict, include_tier3: bool = False) -> dict: + """Prueft alle Tier-1 und Tier-2 Services (optional auch Tier-3).""" + dump_base = find_dump_base() issues = [] + results = {} for key, svc in all_services.items(): tier = svc.get("tier", 3) - if tier > 2: - continue # Tier-3 im Summary ueberspringen + if not include_tier3 and tier > 2: + continue + + status = check_service(svc) + healthy = status.get("healthy") - insp = docker_inspect(svc["container_name"]) - healthy = is_healthy(insp) results[key] = { "tier": tier, + "method": status["method"], "healthy": healthy, - "status": insp["status"], - "health": insp["health"], + "status_code": status.get("status_code"), + "error": status.get("error"), } - if not healthy: - issues.append({"service": key, "tier": tier, **insp}) - # Dump-Checks fuer alle Dienste mit dump_file - dump_base = meta.get("dump_base", "/mnt/user/backups/borg/dumps/latest") + # Nur echte Fehler als Issue zaehlen (None = intern, nicht pruefbar) + if healthy is False: + issues.append({ + "service": key, + "tier": tier, + "url": svc.get("url"), + **status, + }) + + # Dump-Checks stale_dumps = [] for key, svc in all_services.items(): info = get_dump_info(svc.get("dump_file"), dump_base) @@ -232,13 +242,19 @@ def build_summary_report(all_services: dict, meta: dict) -> dict: "age_hours": info["age_hours"], }) + dump_available = dump_base is not None + return { "mode": "summary", "timestamp": datetime.now().isoformat(), + "dump_base_found": dump_available, "services_checked": len(results), "issues": issues, "stale_dumps": stale_dumps, - "overall_healthy": len(issues) == 0 and len(stale_dumps) == 0, + "overall_healthy": len(issues) == 0, + "note": "Interne Services ohne URL konnten nicht geprueft werden." if any( + v["method"] == "internal" for v in results.values() + ) else "", } @@ -248,60 +264,25 @@ def build_summary_report(all_services: dict, meta: dict) -> dict: def main(): args = sys.argv[1:] - all_services, meta = load_services() + all_services, _ = load_services() - if "--summary" in args: - report = build_summary_report(all_services, meta) + include_all = "--all" in args + summary_mode = "--summary" in args + + if summary_mode or not args or args[0].startswith("--"): + report = build_summary_report(all_services, include_tier3=include_all) print(json.dumps(report, indent=2, ensure_ascii=False)) return - # Expliziter Service-Key als Argument - if args and not args[0].startswith("--"): - service_key = args[0] - service = all_services.get(service_key) - if not service: - print(json.dumps({"error": f"Service '{service_key}' nicht in services.yaml gefunden."})) - sys.exit(1) - report = build_service_report(service_key, service, all_services, meta) - print(json.dumps(report, indent=2, ensure_ascii=False)) - return + # Gezielter Service-Key + service_key = args[0] + service = all_services.get(service_key) + if not service: + print(json.dumps({"error": f"Service '{service_key}' nicht in services.json gefunden."})) + sys.exit(1) - # Kein Argument: alle unhealthy Container automatisch finden - unhealthy_names = get_unhealthy_containers() - - if not unhealthy_names: - print(json.dumps({"status": "all_healthy", "timestamp": datetime.now().isoformat()})) - return - - reports = [] - for container_name in unhealthy_names: - # Container-Name auf Service-Key mappen - service_key = None - service = None - for key, svc in all_services.items(): - if svc["container_name"] == container_name: - service_key = key - service = svc - break - - if not service: - reports.append({ - "service": container_name, - "description": "Unbekannter Container (nicht in services.yaml)", - "tier": None, - "container": {"name": container_name, "status": "unhealthy", "health": "unknown", "healthy": False}, - "dependencies": {}, - "unhealthy_deps": [], - "dump": None, - "first_check": "Container nicht in services.yaml — manuell pruefen", - "notes": "services.yaml aktualisieren wenn dieser Container produktiv ist", - "timestamp": datetime.now().isoformat(), - }) - continue - - reports.append(build_service_report(service_key, service, all_services, meta)) - - print(json.dumps(reports, indent=2, ensure_ascii=False)) + report = build_service_report(service_key, service, all_services) + print(json.dumps(report, indent=2, ensure_ascii=False)) if __name__ == "__main__":