Files
homelab-infra/services/posture-check/weather-day-report.py
T

556 lines
20 KiB
Python

#!/usr/bin/env python3
"""
Generate a Markdown weather report for one KalliHome archive day.
The script queries Grafana's InfluxDB datasource through /api/ds/query. It does
not store credentials; provide a Grafana service account token through
GRAFANA_SERVICE_ACCOUNT_TOKEN or GRAFANA_TOKEN_FILE.
"""
from __future__ import annotations
import argparse
import json
import math
import os
import sys
import urllib.error
import urllib.request
from datetime import date, datetime, time, timedelta, timezone, tzinfo
from typing import Any
from zoneinfo import ZoneInfo, ZoneInfoNotFoundError
DEFAULT_GRAFANA_URL = "https://monitoring.kaleschke.info"
DEFAULT_TOKEN_FILE = "/mnt/user/appdata/secrets/monitoring_grafana_weather_report_token.txt"
DATASOURCE_UID = "ha-weather-influx"
DATASOURCE_TYPE = "influxdb"
TEMP_TABLE = "\u00b0C"
SOLAR_TABLE = "W/m\u00b2"
class WeatherReportError(RuntimeError):
pass
def last_sunday(year: int, month: int) -> date:
current = date(year, month + 1, 1) - timedelta(days=1) if month < 12 else date(year, 12, 31)
return current - timedelta(days=(current.weekday() + 1) % 7)
class EuropeBerlinFallback(tzinfo):
"""Small Europe/Berlin fallback for systems without IANA tzdata."""
def _dst_bounds_utc(self, year: int) -> tuple[datetime, datetime]:
start = datetime.combine(last_sunday(year, 3), time(1, 0), tzinfo=timezone.utc)
end = datetime.combine(last_sunday(year, 10), time(1, 0), tzinfo=timezone.utc)
return start, end
def _is_dst_utc(self, dt: datetime) -> bool:
utc_dt = dt.replace(tzinfo=timezone.utc) if dt.tzinfo is self else dt.astimezone(timezone.utc)
start, end = self._dst_bounds_utc(utc_dt.year)
return start <= utc_dt < end
def _is_dst_local(self, dt: datetime) -> bool:
naive = dt.replace(tzinfo=None)
start = datetime.combine(last_sunday(naive.year, 3), time(2, 0))
end = datetime.combine(last_sunday(naive.year, 10), time(3, 0))
return start <= naive < end
def utcoffset(self, dt: datetime | None) -> timedelta:
if dt is not None and self._is_dst_local(dt):
return timedelta(hours=2)
return timedelta(hours=1)
def dst(self, dt: datetime | None) -> timedelta:
if dt is not None and self._is_dst_local(dt):
return timedelta(hours=1)
return timedelta(0)
def tzname(self, dt: datetime | None) -> str:
return "CEST" if dt is not None and self._is_dst_local(dt) else "CET"
def fromutc(self, dt: datetime) -> datetime:
offset = timedelta(hours=2) if self._is_dst_utc(dt) else timedelta(hours=1)
return (dt.replace(tzinfo=timezone.utc) + offset).replace(tzinfo=self)
def load_timezone(tz_name: str) -> tzinfo:
try:
return ZoneInfo(tz_name)
except ZoneInfoNotFoundError:
if tz_name == "Europe/Berlin":
return EuropeBerlinFallback()
raise
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Generate a KalliHome weather day report.")
parser.add_argument(
"--date",
default=os.environ.get("WEATHER_REPORT_DATE"),
help="Local report date as YYYY-MM-DD. Defaults to yesterday in --timezone.",
)
parser.add_argument(
"--timezone",
default=os.environ.get("WEATHER_REPORT_TZ", "Europe/Berlin"),
help="IANA timezone for the local day boundary. Default: Europe/Berlin.",
)
parser.add_argument(
"--grafana-url",
default=os.environ.get("GRAFANA_URL", DEFAULT_GRAFANA_URL),
help=f"Grafana base URL. Default: {DEFAULT_GRAFANA_URL}",
)
parser.add_argument(
"--token-file",
default=os.environ.get("GRAFANA_TOKEN_FILE", DEFAULT_TOKEN_FILE),
help=f"Grafana service account token file. Default: {DEFAULT_TOKEN_FILE}",
)
parser.add_argument(
"--heading-level",
type=int,
default=int(os.environ.get("WEATHER_REPORT_HEADING_LEVEL", "1")),
choices=range(1, 5),
help="Markdown heading level for the title. Default: 1.",
)
parser.add_argument(
"--json",
action="store_true",
help="Print raw summarized values as JSON instead of Markdown.",
)
return parser.parse_args()
def read_token(token_file: str) -> str:
token = os.environ.get("GRAFANA_SERVICE_ACCOUNT_TOKEN", "").strip()
if token:
return token
try:
with open(token_file, "r", encoding="utf-8-sig") as handle:
token = handle.read().strip()
except FileNotFoundError as exc:
raise WeatherReportError(
f"Grafana token missing. Set GRAFANA_SERVICE_ACCOUNT_TOKEN or create {token_file}."
) from exc
if not token:
raise WeatherReportError(f"Grafana token file is empty: {token_file}")
return token
def day_bounds(report_date: str | None, tz_name: str) -> tuple[date, datetime, datetime]:
tz = load_timezone(tz_name)
if report_date:
local_date = date.fromisoformat(report_date)
else:
local_date = datetime.now(tz).date() - timedelta(days=1)
start_local = datetime.combine(local_date, time.min, tzinfo=tz)
end_local = start_local + timedelta(days=1)
return (
local_date,
start_local.astimezone(timezone.utc),
end_local.astimezone(timezone.utc),
)
def sql_timestamp(dt: datetime) -> str:
return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def grafana_query(
grafana_url: str,
token: str,
sql: str,
start_utc: datetime,
end_utc: datetime,
) -> list[dict[str, Any]]:
url = grafana_url.rstrip("/") + "/api/ds/query"
payload = {
"queries": [
{
"refId": "A",
"datasource": {"uid": DATASOURCE_UID, "type": DATASOURCE_TYPE},
"rawQuery": True,
"rawSql": sql,
"format": "table",
}
],
"from": sql_timestamp(start_utc),
"to": sql_timestamp(end_utc),
}
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
},
method="POST",
)
try:
with urllib.request.urlopen(request, timeout=30) as response:
data = json.loads(response.read().decode("utf-8"))
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise WeatherReportError(f"Grafana query failed with HTTP {exc.code}: {detail}") from exc
except urllib.error.URLError as exc:
raise WeatherReportError(f"Grafana is not reachable: {exc}") from exc
result = data.get("results", {}).get("A", {})
if result.get("error"):
raise WeatherReportError(f"Grafana datasource error: {result['error']}")
frames = result.get("frames") or []
if not frames:
return []
frame = frames[0]
fields = [field["name"] for field in frame.get("schema", {}).get("fields", [])]
values = frame.get("data", {}).get("values", [])
if not fields or not values:
return []
rows: list[dict[str, Any]] = []
for idx in range(len(values[0])):
rows.append({field: values[pos][idx] for pos, field in enumerate(fields)})
return rows
def one(rows: list[dict[str, Any]]) -> dict[str, Any]:
return rows[0] if rows else {}
def by_entity(rows: list[dict[str, Any]], entity_id: str) -> dict[str, Any]:
for row in rows:
if row.get("entity_id") == entity_id:
return row
return {}
def as_float(value: Any) -> float | None:
if value is None:
return None
try:
number = float(value)
except (TypeError, ValueError):
return None
if math.isnan(number):
return None
return number
def fmt(value: Any, digits: int = 1, missing: str = "n/a") -> str:
number = as_float(value)
if number is None:
return missing
return f"{number:.{digits}f}".replace(".", ",")
def fmt_int(value: Any, missing: str = "n/a") -> str:
number = as_float(value)
if number is None:
return missing
return f"{round(number):d}"
def fmt_signed(value: Any, digits: int = 1) -> str:
number = as_float(value)
if number is None:
return "n/a"
sign = "+" if number > 0 else ""
return f"{sign}{number:.{digits}f}".replace(".", ",")
def local_hhmm(value: Any, tz_name: str) -> str:
if value in (None, ""):
return "n/a"
tz = load_timezone(tz_name)
if isinstance(value, (int, float)):
return datetime.fromtimestamp(float(value) / 1000, timezone.utc).astimezone(tz).strftime("%H:%M")
if isinstance(value, str):
try:
# Grafana may return ISO timestamps or millisecond timestamps encoded as strings.
if value.isdigit():
return datetime.fromtimestamp(float(value) / 1000, timezone.utc).astimezone(tz).strftime("%H:%M")
normalized = value.replace("Z", "+00:00")
return datetime.fromisoformat(normalized).astimezone(tz).strftime("%H:%M")
except ValueError:
return value
return str(value)
def summarize(grafana_url: str, token: str, report_date: date, start_utc: datetime, end_utc: datetime) -> dict[str, Any]:
start = sql_timestamp(start_utc)
end = sql_timestamp(end_utc)
where = f"time >= timestamp '{start}' AND time < timestamp '{end}'"
temp = grafana_query(
grafana_url,
token,
(
f'SELECT entity_id, count(value) AS samples, min(value) AS min_value, '
f'max(value) AS max_value, avg(value) AS avg_value FROM "{TEMP_TABLE}" '
"WHERE entity_id IN ('gw3000a_outdoor_temperature',"
"'gw3000a_feels_like_temperature','gw3000a_dewpoint') "
f"AND {where} GROUP BY entity_id ORDER BY entity_id"
),
start_utc,
end_utc,
)
humidity = grafana_query(
grafana_url,
token,
(
'SELECT entity_id, count(value) AS samples, min(value) AS min_value, '
'max(value) AS max_value, avg(value) AS avg_value FROM "%" '
"WHERE entity_id IN ('gw3000a_humidity','gw3000a_indoor_humidity') "
f"AND {where} GROUP BY entity_id ORDER BY entity_id"
),
start_utc,
end_utc,
)
wind = grafana_query(
grafana_url,
token,
(
'SELECT entity_id, count(value) AS samples, min(value) AS min_value, '
'max(value) AS max_value, avg(value) AS avg_value FROM "km/h" '
"WHERE entity_id IN ('gw3000a_wind_speed','gw3000a_wind_gust') "
f"AND {where} GROUP BY entity_id ORDER BY entity_id"
),
start_utc,
end_utc,
)
return {
"date": report_date.isoformat(),
"start_utc": start,
"end_utc": end,
"outdoor_temperature": by_entity(temp, "gw3000a_outdoor_temperature"),
"feels_like": by_entity(temp, "gw3000a_feels_like_temperature"),
"dewpoint": by_entity(temp, "gw3000a_dewpoint"),
"humidity": by_entity(humidity, "gw3000a_humidity"),
"wind_speed": by_entity(wind, "gw3000a_wind_speed"),
"wind_gust": by_entity(wind, "gw3000a_wind_gust"),
"rain": one(
grafana_query(
grafana_url,
token,
f'SELECT count(value) AS samples, max(value) AS rain_mm FROM "mm" '
f"WHERE entity_id = 'gw3000a_daily_rain' AND {where}",
start_utc,
end_utc,
)
),
"solar": one(
grafana_query(
grafana_url,
token,
f'SELECT count(value) AS samples, max(value) AS max_value, avg(value) AS avg_value '
f'FROM "{SOLAR_TABLE}" WHERE entity_id = \'gw3000a_solar_radiation\' AND {where}',
start_utc,
end_utc,
)
),
"uv": one(
grafana_query(
grafana_url,
token,
f'SELECT count(value) AS samples, max(value) AS max_value, avg(value) AS avg_value '
f'FROM "UV index" WHERE entity_id = \'gw3000a_uv_index\' AND {where}',
start_utc,
end_utc,
)
),
"pressure": one(
grafana_query(
grafana_url,
token,
f'SELECT count(value) AS samples, min(value) AS min_value, max(value) AS max_value, '
f'avg(value) AS avg_value FROM "hPa" WHERE entity_id = \'gw3000a_relative_pressure\' AND {where}',
start_utc,
end_utc,
)
),
"pressure_start": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "hPa" WHERE entity_id = \'gw3000a_relative_pressure\' '
f"AND {where} ORDER BY time ASC LIMIT 1",
start_utc,
end_utc,
)
),
"pressure_end": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "hPa" WHERE entity_id = \'gw3000a_relative_pressure\' '
f"AND {where} ORDER BY time DESC LIMIT 1",
start_utc,
end_utc,
)
),
"temperature_min_time": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "{TEMP_TABLE}" WHERE entity_id = \'gw3000a_outdoor_temperature\' '
f"AND {where} ORDER BY value ASC LIMIT 1",
start_utc,
end_utc,
)
),
"temperature_max_time": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "{TEMP_TABLE}" WHERE entity_id = \'gw3000a_outdoor_temperature\' '
f"AND {where} ORDER BY value DESC LIMIT 1",
start_utc,
end_utc,
)
),
"gust_max_time": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "km/h" WHERE entity_id = \'gw3000a_wind_gust\' '
f"AND {where} ORDER BY value DESC LIMIT 1",
start_utc,
end_utc,
)
),
"solar_max_time": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "{SOLAR_TABLE}" WHERE entity_id = \'gw3000a_solar_radiation\' '
f"AND {where} ORDER BY value DESC LIMIT 1",
start_utc,
end_utc,
)
),
"uv_max_time": one(
grafana_query(
grafana_url,
token,
f'SELECT time, value FROM "UV index" WHERE entity_id = \'gw3000a_uv_index\' '
f"AND {where} ORDER BY value DESC LIMIT 1",
start_utc,
end_utc,
)
),
}
def render_markdown(summary: dict[str, Any], tz_name: str, heading_level: int) -> str:
out_temp = summary["outdoor_temperature"]
feels = summary["feels_like"]
dew = summary["dewpoint"]
humidity = summary["humidity"]
wind_speed = summary["wind_speed"]
wind_gust = summary["wind_gust"]
rain = summary["rain"]
solar = summary["solar"]
uv = summary["uv"]
pressure = summary["pressure"]
pressure_start = as_float(summary["pressure_start"].get("value"))
pressure_end = as_float(summary["pressure_end"].get("value"))
pressure_trend = None if pressure_start is None or pressure_end is None else pressure_end - pressure_start
solar_max = as_float(solar.get("max_value"))
uv_max = as_float(uv.get("max_value"))
temp_max = as_float(out_temp.get("max_value"))
if solar_max is not None and uv_max is not None and solar_max >= 700 and uv_max >= 6:
narrative = (
"Der Tag war warm, hell und ueberwiegend sonnig; die hohe Solarstrahlung "
f"und der UV-Index von {fmt(uv_max)} passen klar zu einem schoenen Sommertag."
)
elif temp_max is not None and temp_max >= 25:
narrative = "Der Tag war warm; die Messwerte sprechen fuer sommerliches Wetter."
else:
narrative = "Der Tag war wettertechnisch unauffaellig; die folgenden Messwerte fassen ihn zusammen."
rain_samples = int(as_float(rain.get("samples")) or 0)
if rain_samples > 0:
rain_line = f"- Regen: {fmt(rain.get('rain_mm'))} mm Tagesmenge laut daily_rain."
else:
rain_line = (
f"- Regen: fuer {summary['date']} nicht belastbar auswertbar, "
"weil `gw3000a_daily_rain` an dem Tag keine Samples hatte."
)
heading = "#" * heading_level
lines = [
f"{heading} Wetterbericht KalliHome - {summary['date']}",
"",
f"Zeitraum: {summary['date']} 00:00 bis 24:00 Europe/Berlin.",
"",
narrative,
"",
(
f"- Temperatur aussen: {fmt(out_temp.get('min_value'))} bis {fmt(out_temp.get('max_value'))} °C, "
f"Mittel {fmt(out_temp.get('avg_value'))} °C. "
f"Minimum um {local_hhmm(summary['temperature_min_time'].get('time'), tz_name)}, "
f"Maximum um {local_hhmm(summary['temperature_max_time'].get('time'), tz_name)}."
),
(
f"- Gefuehlt: Maximum {fmt(feels.get('max_value'))} °C, "
f"Mittel {fmt(feels.get('avg_value'))} °C. "
f"Taupunkt im Mittel {fmt(dew.get('avg_value'))} °C."
),
(
f"- Luftfeuchte aussen: {fmt_int(humidity.get('min_value'))} bis "
f"{fmt_int(humidity.get('max_value'))} %, Mittel {fmt_int(humidity.get('avg_value'))} %."
),
(
f"- Wind: Mittel {fmt(wind_speed.get('avg_value'))} km/h, "
f"Maximum Wind {fmt(wind_speed.get('max_value'))} km/h; "
f"staerkste Boe {fmt(wind_gust.get('max_value'))} km/h um "
f"{local_hhmm(summary['gust_max_time'].get('time'), tz_name)}."
),
rain_line,
(
f"- Solarstrahlung: Maximum {fmt_int(solar.get('max_value'))} W/m² um "
f"{local_hhmm(summary['solar_max_time'].get('time'), tz_name)}, "
f"Mittel {fmt_int(solar.get('avg_value'))} W/m²."
),
f"- UV-Index: Maximum {fmt(uv.get('max_value'))} um {local_hhmm(summary['uv_max_time'].get('time'), tz_name)}.",
(
f"- Luftdruck: {fmt_int(pressure.get('min_value'))} bis {fmt_int(pressure.get('max_value'))} hPa, "
f"Mittel {fmt_int(pressure.get('avg_value'))} hPa; "
f"Tendenz {fmt_signed(pressure_trend)} hPa ueber den Tag."
),
"",
"Datenabdeckung/Samples:",
(
f"- Temperatur aussen: {out_temp.get('samples', 0)}, "
f"Luftfeuchte aussen: {humidity.get('samples', 0)}, "
f"Wind: {wind_speed.get('samples', 0)}, Boeen: {wind_gust.get('samples', 0)}, "
f"Regen: {rain.get('samples', 0)}, Solar: {solar.get('samples', 0)}, "
f"UV: {uv.get('samples', 0)}, Luftdruck: {pressure.get('samples', 0)}"
),
]
return "\n".join(lines)
def main() -> int:
args = parse_args()
try:
token = read_token(args.token_file)
report_date, start_utc, end_utc = day_bounds(args.date, args.timezone)
summary = summarize(args.grafana_url, token, report_date, start_utc, end_utc)
if args.json:
print(json.dumps(summary, ensure_ascii=False, indent=2))
else:
print(render_markdown(summary, args.timezone, args.heading_level))
except Exception as exc:
print(f"weather-day-report: {exc}", file=sys.stderr)
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main())