Files
overub/core/update_service.py
2025-12-21 17:12:32 +01:00

239 lines
10 KiB
Python

from __future__ import annotations
import asyncio
import contextlib
import json
import time
from datetime import datetime
from typing import List
from core.changelog import format_changelog, parse_conventional
from core.logger import get_logger
from core.notifications import Notifier
from core.scheduler import ScheduleConfig, Scheduler
from core.webhook import parse_webhook, verify_signature
logger = get_logger("core.update_service")
class UpdateService:
def __init__(self, app: "OverUB") -> None:
self.app = app
self.config = app.config.get().get("updates", {})
self.notifier = Notifier(app)
self._scheduler = self._build_scheduler()
self._history_path = app.root / "data" / "update_history.jsonl"
self._task: asyncio.Task | None = None
def _build_scheduler(self) -> Scheduler:
scheduler_cfg = self.app.config.get().get("scheduler", {})
update_time = self.config.get("update_time", "03:00")
time_value = scheduler_cfg.get("auto_update_time", update_time)
return Scheduler(
ScheduleConfig(
time=time_value,
postpone_on_activity=bool(scheduler_cfg.get("postpone_on_activity", True)),
max_downtime=int(scheduler_cfg.get("max_downtime", 120)),
retry_failed=bool(scheduler_cfg.get("retry_failed", True)),
retry_interval=int(scheduler_cfg.get("retry_interval", 3600)),
)
)
def _refresh_scheduler(self) -> None:
scheduler_cfg = self.app.config.get().get("scheduler", {})
update_time = self.config.get("update_time", "03:00")
self._scheduler.config.time = scheduler_cfg.get("auto_update_time", update_time)
self._scheduler.config.postpone_on_activity = bool(
scheduler_cfg.get("postpone_on_activity", True)
)
self._scheduler.config.max_downtime = int(scheduler_cfg.get("max_downtime", 120))
self._scheduler.config.retry_failed = bool(scheduler_cfg.get("retry_failed", True))
self._scheduler.config.retry_interval = int(scheduler_cfg.get("retry_interval", 3600))
async def check_updates(self) -> List[str]:
return await self.app.updater.check_updates()
async def apply_updates(self) -> str:
start = time.monotonic()
commits = await self.app.updater.get_commits_ahead()
diff_stats = await self.app.updater.get_diff_stats()
try:
allowed = self.config.get("allowed_sources", [])
if allowed:
remote = await self.app.updater.get_remote_url()
if not any(remote.startswith(src) for src in allowed):
raise RuntimeError("Remote not allowed")
if self.config.get("backup_before_update", True):
self.app.backups.create("core")
if self.config.get("verify_commits", False):
commits = await self.app.updater.get_commits_ahead()
allowed = self.app.config.get().get("security", {}).get("allowed_signers", [])
for line in commits:
commit = line.split(" ", 1)[0]
if not await self.app.updater.verify_commit(commit, allowed_signers=allowed):
raise RuntimeError("Commit verification failed")
output = await self.app.updater.pull_updates()
except Exception as exc:
self.record_event(
action="core_update",
status="failed",
meta={"error": str(exc)},
)
raise
duration = time.monotonic() - start
self.record_event(
action="core_update",
status="success",
meta={
"commits": len(commits),
"duration": duration,
"lines_added": diff_stats.get("added", 0),
"lines_deleted": diff_stats.get("deleted", 0),
},
)
self._log_update(output)
return output
async def rollback(self, ref: str = "HEAD~1") -> str:
output = await self.app.updater.rollback(ref)
self.record_event(action="core_rollback", status="success", meta={"ref": ref})
self._log_update(f"rollback {ref}")
return output
async def notify_updates(self, commits: List[str]) -> None:
if not commits:
return
buckets = parse_conventional(commits)
summary = ["OverUB Update Available", format_changelog(buckets, inline=True)]
await self.notifier.notify("\n".join(summary), event="update_available")
async def _run_loop(self) -> None:
while True:
self.config = self.app.config.get().get("updates", {})
self._refresh_scheduler()
interval = int(self.config.get("check_interval", 3600))
auto_update = bool(self.config.get("auto_update", False))
try:
commits = await self.check_updates()
if commits:
if self.config.get("notify", True):
await self.notify_updates(commits)
if auto_update and self._scheduler.should_run(self.app.last_activity):
output = await self.apply_updates()
self._scheduler.mark_run()
await self.notifier.notify(output or "Updated", event="update_completed")
await self._auto_update_plugins()
except Exception:
logger.exception("Update check failed")
self._scheduler.mark_failed()
self.record_event(action="core_update", status="failed", meta={})
await self.notifier.notify("Update failed", event="update_failed")
await asyncio.sleep(interval)
def start(self) -> None:
if self._task and not self._task.done():
return
self._task = asyncio.create_task(self._run_loop())
def _log_update(self, output: str) -> None:
path = self.app.root / "data" / "update_history.log"
path.parent.mkdir(parents=True, exist_ok=True)
stamp = datetime.utcnow().isoformat()
with path.open("a", encoding="utf-8") as handle:
handle.write(f"{stamp} {output}\n")
def stats(self) -> dict[str, object]:
history = self._read_history()
core = [item for item in history if item.get("action") == "core_update"]
core_success = [item for item in core if item.get("status") == "success"]
modules = [item for item in history if item.get("action") == "module_update"]
plugins = [item for item in history if item.get("action") == "plugin_update"]
last = max((item.get("timestamp", "") for item in history), default="")
avg_duration = 0.0
if core_success:
avg_duration = sum(item.get("duration", 0.0) for item in core_success) / len(core_success)
total_lines = sum(item.get("lines_added", 0) + item.get("lines_deleted", 0) for item in core)
return {
"core_updates": len(core),
"core_success": len(core_success),
"module_updates": len(modules),
"plugin_updates": len(plugins),
"last_update": last,
"avg_update_time": round(avg_duration, 2),
"total_lines_changed": total_lines,
}
def dashboard(self) -> str:
stats = self.stats()
core_total = stats["core_updates"]
core_success = stats["core_success"]
success_rate = 0
if core_total:
success_rate = int((core_success / core_total) * 100)
lines = [
"Update Statistics",
f"Core Updates: {core_total} ({success_rate}% success)",
f"Module Updates: {stats['module_updates']}",
f"Plugin Updates: {stats['plugin_updates']}",
f"Last Update: {stats['last_update'] or 'never'}",
f"Average Update Time: {stats['avg_update_time']}s",
f"Total Downloaded: {stats['total_lines_changed']} lines",
]
return "\n".join(lines)
async def _auto_update_plugins(self) -> None:
plugin_cfg = self.app.config.get().get("plugins", {})
if not plugin_cfg.get("auto_update", False):
return
for name in self.app.plugins.list_installed():
cfg = self.app.config.get_plugin_config(name)
if cfg.get("auto_update", True) is False:
continue
try:
await self.app.plugins.update(name)
self.record_event(action="plugin_update", status="success", meta={"name": name})
except Exception:
logger.exception("Plugin update failed: %s", name)
self.record_event(action="plugin_update", status="failed", meta={"name": name})
async def handle_webhook(self, payload: dict, signature: str = "") -> None:
secret = self.config.get("gitea", {}).get("webhook_secret", "")
if secret and signature:
if not verify_signature(secret, json.dumps(payload).encode("utf-8"), signature):
raise RuntimeError("Invalid webhook signature")
info = parse_webhook(payload)
if info.get("event") in {"push", "release", "tag"}:
await self.apply_updates()
async def stop(self) -> None:
if self._task:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
def record_event(self, action: str, status: str, meta: dict | None = None) -> None:
entry = {
"timestamp": datetime.utcnow().isoformat(),
"action": action,
"status": status,
}
if meta:
entry.update(meta)
self._history_path.parent.mkdir(parents=True, exist_ok=True)
with self._history_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry) + "\n")
def _read_history(self) -> list[dict]:
if not self._history_path.exists():
return []
entries = []
for line in self._history_path.read_text(encoding="utf-8").splitlines():
if not line.strip():
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries