Files

113 lines
3.3 KiB
Python

import json
import os
import sys
import urllib.error
import urllib.request
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
NTFY_URL = os.environ.get("NTFY_URL", "https://ntfy.kaleschke.info/homelab-alerts")
def priority_for(status, severity):
if status == "resolved":
return "2"
if severity == "critical":
return "5"
if severity == "warning":
return "4"
return "3"
def tags_for(status, severity):
if status == "resolved":
return "white_check_mark"
if severity == "critical":
return "rotating_light"
if severity == "warning":
return "warning"
return "information_source"
def alert_message(alert):
labels = alert.get("labels", {})
annotations = alert.get("annotations", {})
status = alert.get("status", "firing")
severity = labels.get("severity", "info")
alertname = labels.get("alertname", "Alert")
target = labels.get("instance") or labels.get("service") or labels.get("mountpoint") or "homelab"
summary = annotations.get("summary") or alertname
description = annotations.get("description") or ""
title = f"{status.upper()} {severity}: {alertname}"
lines = [
summary,
f"Target: {target}",
]
if description and description != summary:
lines.append(description)
return title, "\n".join(lines), priority_for(status, severity), tags_for(status, severity)
def send_ntfy(title, message, priority, tags):
req = urllib.request.Request(
NTFY_URL,
data=message.encode("utf-8"),
headers={
"Title": title,
"Priority": priority,
"Tags": tags,
},
method="POST",
)
with urllib.request.urlopen(req, timeout=15) as response:
response.read()
class Handler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/healthz":
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok\n")
return
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.path != "/alertmanager":
self.send_response(404)
self.end_headers()
return
length = int(self.headers.get("Content-Length", "0"))
payload = json.loads(self.rfile.read(length) or b"{}")
alerts = payload.get("alerts", [])
sent = 0
for alert in alerts:
title, message, priority, tags = alert_message(alert)
try:
send_ntfy(title, message, priority, tags)
sent += 1
except urllib.error.URLError as exc:
print(f"ntfy send failed: {exc}", file=sys.stderr, flush=True)
self.send_response(502)
self.end_headers()
self.wfile.write(b"ntfy send failed\n")
return
print(f"sent {sent} ntfy notifications", flush=True)
self.send_response(200)
self.end_headers()
self.wfile.write(f"sent {sent}\n".encode("utf-8"))
def log_message(self, fmt, *args):
print(fmt % args, flush=True)
if __name__ == "__main__":
server = ThreadingHTTPServer(("0.0.0.0", 8080), Handler)
print(f"alertmanager ntfy bridge listening on :8080 -> {NTFY_URL}", flush=True)
server.serve_forever()