update
This commit is contained in:
2026-05-06 20:18:25 +02:00
parent 84020346bc
commit 5cc0a4dadb
+140 -159
View File
@@ -2,29 +2,33 @@
""" """
check_health.py — Homelab Alert Enricher check_health.py — Homelab Alert Enricher
========================================= =========================================
Laedt services.json, prueft Docker-Health aller bekannten Abhaengigkeiten, Laedt services.json, prueft Services via HTTP(S) und gibt einen
liest Dump-Timestamps und gibt einen strukturierten JSON-Report aus. strukturierten JSON-Report aus. Hermes nutzt diesen Report fuer
angereicherte ntfy-Alerts.
Hermes liest diesen Report und baut daraus eine angereicherte ntfy-Nachricht.
Keine externen Abhaengigkeiten — nur Python-Standardbibliothek. Keine externen Abhaengigkeiten — nur Python-Standardbibliothek.
Kein Docker CLI, kein Root, kein pip.
Check-Strategie:
- Services MIT url: HTTP GET, 2xx/3xx = healthy, Timeout/4xx/5xx = unhealthy
- Services OHNE url: "internal" — kein externer Check moeglich
- Dump-Timestamps: werden gelesen falls /mnt/user/... erreichbar ist (optional)
Verwendung: Verwendung:
python3 check_health.py # alle unhealthy Container python3 check_health.py # alle Services pruefen (Tier 1+2)
python3 check_health.py paperless-ngx # gezielt einen Service pruefen python3 check_health.py paperless-ngx # gezielt einen Service pruefen
python3 check_health.py --summary # Gesamtstatus als Zusammenfassung python3 check_health.py --summary # Gesamtstatus Tier 1+2
python3 check_health.py --all # alle Tiers inkl. Tier 3
Pfad auf der Hermes-VM (via git pull): Pfad auf der Hermes-VM:
/srv/hermes-workspace/homelab-infra/ops/hermes-agent/scripts/check_health.py /srv/hermes-workspace/homelab-infra/ops/hermes-agent/scripts/check_health.py
services.json wird relativ zum Script-Verzeichnis gesucht:
../services.json
""" """
import json import json
import os import ssl
import subprocess
import sys import sys
import urllib.request
import urllib.error
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@@ -34,111 +38,114 @@ from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.resolve() SCRIPT_DIR = Path(__file__).parent.resolve()
SERVICES_JSON_PATH = SCRIPT_DIR.parent / "services.json" SERVICES_JSON_PATH = SCRIPT_DIR.parent / "services.json"
# Fallback falls das Repo unter einem anderen Pfad liegt
SERVICES_JSON_FALLBACK = Path("/srv/hermes-workspace/homelab-infra/ops/hermes-agent/services.json") SERVICES_JSON_FALLBACK = Path("/srv/hermes-workspace/homelab-infra/ops/hermes-agent/services.json")
# Dump-Warnschwelle in Stunden (aelter = Warnung) # HTTP-Check Timeout in Sekunden
HTTP_TIMEOUT = 8
# Dump-Verzeichnis (optional — wird uebersprungen wenn nicht erreichbar)
DUMP_BASE_PATHS = [
Path("/mnt/user/backups/borg/dumps/latest"), # Unraid direkt
Path("/opt/dumps"), # gemounteter Fallback
]
# Dump-Warnschwelle
DUMP_WARN_HOURS = 26 DUMP_WARN_HOURS = 26
# SSL-Verification (True = strikt, False = ignoriert selbstsignierte Zerts)
SSL_VERIFY = False
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Hilfsfunktionen # Hilfsfunktionen
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def load_services(): def load_services():
"""Laedt services.json. Gibt (services_dict, meta_dict) zurueck. """Laedt services.json ohne externe Abhaengigkeiten."""
Keine externen Abhaengigkeiten — verwendet nur json aus der Standardbibliothek."""
path = SERVICES_JSON_PATH if SERVICES_JSON_PATH.exists() else SERVICES_JSON_FALLBACK path = SERVICES_JSON_PATH if SERVICES_JSON_PATH.exists() else SERVICES_JSON_FALLBACK
if not path.exists(): if not path.exists():
raise FileNotFoundError( raise FileNotFoundError(
f"services.json nicht gefunden: {path}\n" f"services.json nicht gefunden: {path}\n"
f"Bitte 'git pull' in /srv/hermes-workspace/homelab-infra/ ausfuehren." "Bitte 'git pull' in /srv/hermes-workspace/homelab-infra/ ausfuehren."
) )
with open(path, encoding="utf-8") as f: with open(path, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)
return data.get("services", {}), data.get("meta", {}) return data.get("services", {}), data.get("meta", {})
def docker_inspect(container_name: str) -> dict: def http_check(url: str) -> dict:
""" """
Gibt {'status': str, 'health': str} zurueck. Fuehrt einen HTTP GET gegen url aus.
status: running | exited | restarting | dead | not_found | error Gibt {'reachable': bool, 'status_code': int|None, 'error': str|None} zurueck.
health: healthy | unhealthy | starting | none | unknown 2xx und 3xx gelten als healthy. 401/403 auch (Service laeuft, Auth blockiert).
""" """
try: ctx = ssl.create_default_context()
result = subprocess.run( if not SSL_VERIFY:
[ ctx.check_hostname = False
"docker", "inspect", ctx.verify_mode = ssl.CERT_NONE
"--format",
"{{.State.Status}}|||{{if .State.Health}}{{.State.Health.Status}}{{else}}none{{end}}",
container_name,
],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
return {"status": "not_found", "health": "unknown"}
parts = result.stdout.strip().split("|||") try:
return { req = urllib.request.Request(url, method="GET", headers={"User-Agent": "hermes-healthcheck/1.0"})
"status": parts[0].strip() if parts else "unknown", with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT, context=ctx) as resp:
"health": parts[1].strip() if len(parts) > 1 else "none", code = resp.status
} healthy = code < 500
return {"reachable": healthy, "status_code": code, "error": None}
except urllib.error.HTTPError as e:
# 4xx = Service laeuft, aber Auth oder Not Found — trotzdem erreichbar
healthy = e.code < 500
return {"reachable": healthy, "status_code": e.code, "error": None}
except urllib.error.URLError as e:
return {"reachable": False, "status_code": None, "error": str(e.reason)}
except Exception as e: except Exception as e:
return {"status": "error", "health": str(e)} return {"reachable": False, "status_code": None, "error": str(e)}
def is_healthy(inspect_result: dict) -> bool: def check_service(service: dict) -> dict:
status = inspect_result.get("status", "") """
health = inspect_result.get("health", "") Prueft einen einzelnen Service.
if status != "running": Gibt {'healthy': bool, 'method': str, 'detail': dict} zurueck.
return False """
if health in ("unhealthy",): url = service.get("url")
return False
return True if url:
result = http_check(url)
return {
"healthy": result["reachable"],
"method": "http",
"url": url,
"status_code": result["status_code"],
"error": result["error"],
}
else:
return {
"healthy": None, # None = unbekannt (intern, kein externer Check)
"method": "internal",
"url": None,
"status_code": None,
"error": "Kein externer Check — interner Service ohne URL",
}
def get_unhealthy_containers() -> list[str]: def find_dump_base() -> Path | None:
"""Gibt Liste aller Container zurueck die unhealthy oder nicht running sind.""" """Sucht das Dump-Verzeichnis in bekannten Pfaden."""
try: for p in DUMP_BASE_PATHS:
# unhealthy per healthcheck if p.exists():
r1 = subprocess.run( return p
["docker", "ps", "--filter", "health=unhealthy", "--format", "{{.Names}}"],
capture_output=True, text=True, timeout=10,
)
# exited/dead Container die eigentlich laufen sollten
r2 = subprocess.run(
["docker", "ps", "--filter", "status=exited", "--format", "{{.Names}}"],
capture_output=True, text=True, timeout=10,
)
names = set()
for raw in (r1.stdout, r2.stdout):
for name in raw.strip().split("\n"):
name = name.strip()
if name:
names.add(name)
return sorted(names)
except Exception:
return []
def get_dump_info(dump_file: str | None, dump_base: str) -> dict | None:
"""Gibt Alter und Groesse des Dump-Files zurueck (oder None wenn nicht vorhanden)."""
if not dump_file:
return None return None
path = Path(dump_base) / dump_file
def get_dump_info(dump_file: str | None, dump_base: Path | None) -> dict | None:
"""Liest Alter und Groesse einer Dump-Datei."""
if not dump_file or not dump_base:
return None
path = dump_base / dump_file
if not path.exists(): if not path.exists():
return {"file": dump_file, "exists": False, "age_hours": None, "size_mb": None} return {"file": dump_file, "exists": False, "age_hours": None, "size_mb": None, "warn": False}
stat = path.stat() stat = path.stat()
age_hours = round((datetime.now().timestamp() - stat.st_mtime) / 3600, 1) age_hours = round((datetime.now().timestamp() - stat.st_mtime) / 3600, 1)
size_mb = round(stat.st_size / 1_048_576, 1) size_mb = round(stat.st_size / 1_048_576, 1)
return { return {
"file": dump_file, "file": dump_file,
"exists": True, "exists": True,
@@ -152,30 +159,28 @@ def get_dump_info(dump_file: str | None, dump_base: str) -> dict | None:
# Report-Generierung # Report-Generierung
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def build_service_report(service_key: str, service: dict, all_services: dict, meta: dict) -> dict: def build_service_report(service_key: str, service: dict, all_services: dict) -> dict:
"""Erstellt einen vollstaendigen Report fuer einen einzelnen Service.""" """Vollstaendiger Report fuer einen einzelnen Service inkl. Abhaengigkeiten."""
dump_base = meta.get("dump_base", "/mnt/user/backups/borg/dumps/latest") dump_base = find_dump_base()
# Eigener Container-Status # Eigener Status
own_inspect = docker_inspect(service["container_name"]) own = check_service(service)
own_healthy = is_healthy(own_inspect)
# Abhaengigkeits-Check # Abhaengigkeiten pruefen
dep_results = {} dep_results = {}
for dep_key in service.get("dependencies", []): for dep_key in service.get("dependencies", []):
dep = all_services.get(dep_key) dep = all_services.get(dep_key)
if not dep: if not dep:
dep_results[dep_key] = {"status": "unknown_service", "health": "unknown", "healthy": False} dep_results[dep_key] = {"healthy": None, "method": "unknown", "error": "Nicht in services.json"}
continue continue
insp = docker_inspect(dep["container_name"])
dep_results[dep_key] = { dep_results[dep_key] = {
**insp,
"healthy": is_healthy(insp),
"tier": dep.get("tier"), "tier": dep.get("tier"),
"container_name": dep["container_name"], "container_name": dep.get("container_name"),
**check_service(dep),
} }
unhealthy_deps = [k for k, v in dep_results.items() if not v["healthy"]] # Unhealthy Deps: nur die die definitiv False sind (None = intern = ignorieren)
unhealthy_deps = [k for k, v in dep_results.items() if v.get("healthy") is False]
# Dump-Info # Dump-Info
dump_info = get_dump_info(service.get("dump_file"), dump_base) dump_info = get_dump_info(service.get("dump_file"), dump_base)
@@ -185,11 +190,7 @@ def build_service_report(service_key: str, service: dict, all_services: dict, me
"description": service.get("description", ""), "description": service.get("description", ""),
"tier": service.get("tier"), "tier": service.get("tier"),
"url": service.get("url"), "url": service.get("url"),
"container": { "status": own,
"name": service["container_name"],
**own_inspect,
"healthy": own_healthy,
},
"dependencies": dep_results, "dependencies": dep_results,
"unhealthy_deps": unhealthy_deps, "unhealthy_deps": unhealthy_deps,
"dump": dump_info, "dump": dump_info,
@@ -199,29 +200,38 @@ def build_service_report(service_key: str, service: dict, all_services: dict, me
} }
def build_summary_report(all_services: dict, meta: dict) -> dict: def build_summary_report(all_services: dict, include_tier3: bool = False) -> dict:
"""Prueft alle Tier-1 und Tier-2 Dienste und gibt einen Gesamtstatus zurueck.""" """Prueft alle Tier-1 und Tier-2 Services (optional auch Tier-3)."""
results = {} dump_base = find_dump_base()
issues = [] issues = []
results = {}
for key, svc in all_services.items(): for key, svc in all_services.items():
tier = svc.get("tier", 3) tier = svc.get("tier", 3)
if tier > 2: if not include_tier3 and tier > 2:
continue # Tier-3 im Summary ueberspringen continue
status = check_service(svc)
healthy = status.get("healthy")
insp = docker_inspect(svc["container_name"])
healthy = is_healthy(insp)
results[key] = { results[key] = {
"tier": tier, "tier": tier,
"method": status["method"],
"healthy": healthy, "healthy": healthy,
"status": insp["status"], "status_code": status.get("status_code"),
"health": insp["health"], "error": status.get("error"),
} }
if not healthy:
issues.append({"service": key, "tier": tier, **insp})
# Dump-Checks fuer alle Dienste mit dump_file # Nur echte Fehler als Issue zaehlen (None = intern, nicht pruefbar)
dump_base = meta.get("dump_base", "/mnt/user/backups/borg/dumps/latest") if healthy is False:
issues.append({
"service": key,
"tier": tier,
"url": svc.get("url"),
**status,
})
# Dump-Checks
stale_dumps = [] stale_dumps = []
for key, svc in all_services.items(): for key, svc in all_services.items():
info = get_dump_info(svc.get("dump_file"), dump_base) info = get_dump_info(svc.get("dump_file"), dump_base)
@@ -232,13 +242,19 @@ def build_summary_report(all_services: dict, meta: dict) -> dict:
"age_hours": info["age_hours"], "age_hours": info["age_hours"],
}) })
dump_available = dump_base is not None
return { return {
"mode": "summary", "mode": "summary",
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"dump_base_found": dump_available,
"services_checked": len(results), "services_checked": len(results),
"issues": issues, "issues": issues,
"stale_dumps": stale_dumps, "stale_dumps": stale_dumps,
"overall_healthy": len(issues) == 0 and len(stale_dumps) == 0, "overall_healthy": len(issues) == 0,
"note": "Interne Services ohne URL konnten nicht geprueft werden." if any(
v["method"] == "internal" for v in results.values()
) else "",
} }
@@ -248,60 +264,25 @@ def build_summary_report(all_services: dict, meta: dict) -> dict:
def main(): def main():
args = sys.argv[1:] args = sys.argv[1:]
all_services, meta = load_services() all_services, _ = load_services()
if "--summary" in args: include_all = "--all" in args
report = build_summary_report(all_services, meta) summary_mode = "--summary" in args
if summary_mode or not args or args[0].startswith("--"):
report = build_summary_report(all_services, include_tier3=include_all)
print(json.dumps(report, indent=2, ensure_ascii=False)) print(json.dumps(report, indent=2, ensure_ascii=False))
return return
# Expliziter Service-Key als Argument # Gezielter Service-Key
if args and not args[0].startswith("--"):
service_key = args[0] service_key = args[0]
service = all_services.get(service_key) service = all_services.get(service_key)
if not service: if not service:
print(json.dumps({"error": f"Service '{service_key}' nicht in services.yaml gefunden."})) print(json.dumps({"error": f"Service '{service_key}' nicht in services.json gefunden."}))
sys.exit(1) sys.exit(1)
report = build_service_report(service_key, service, all_services, meta)
report = build_service_report(service_key, service, all_services)
print(json.dumps(report, indent=2, ensure_ascii=False)) print(json.dumps(report, indent=2, ensure_ascii=False))
return
# Kein Argument: alle unhealthy Container automatisch finden
unhealthy_names = get_unhealthy_containers()
if not unhealthy_names:
print(json.dumps({"status": "all_healthy", "timestamp": datetime.now().isoformat()}))
return
reports = []
for container_name in unhealthy_names:
# Container-Name auf Service-Key mappen
service_key = None
service = None
for key, svc in all_services.items():
if svc["container_name"] == container_name:
service_key = key
service = svc
break
if not service:
reports.append({
"service": container_name,
"description": "Unbekannter Container (nicht in services.yaml)",
"tier": None,
"container": {"name": container_name, "status": "unhealthy", "health": "unknown", "healthy": False},
"dependencies": {},
"unhealthy_deps": [],
"dump": None,
"first_check": "Container nicht in services.yaml — manuell pruefen",
"notes": "services.yaml aktualisieren wenn dieser Container produktiv ist",
"timestamp": datetime.now().isoformat(),
})
continue
reports.append(build_service_report(service_key, service, all_services, meta))
print(json.dumps(reports, indent=2, ensure_ascii=False))
if __name__ == "__main__": if __name__ == "__main__":