from __future__ import annotations import asyncio import contextlib import json import time from datetime import datetime from typing import List from core.changelog import format_changelog, parse_conventional from core.logger import get_logger from core.notifications import Notifier from core.scheduler import ScheduleConfig, Scheduler from core.webhook import parse_webhook, verify_signature logger = get_logger("core.update_service") class UpdateService: def __init__(self, app: "OverUB") -> None: self.app = app self.config = app.config.get().get("updates", {}) self.notifier = Notifier(app) self._scheduler = self._build_scheduler() self._history_path = app.root / "data" / "update_history.jsonl" self._task: asyncio.Task | None = None def _build_scheduler(self) -> Scheduler: scheduler_cfg = self.app.config.get().get("scheduler", {}) update_time = self.config.get("update_time", "03:00") time_value = scheduler_cfg.get("auto_update_time", update_time) return Scheduler( ScheduleConfig( time=time_value, postpone_on_activity=bool(scheduler_cfg.get("postpone_on_activity", True)), max_downtime=int(scheduler_cfg.get("max_downtime", 120)), retry_failed=bool(scheduler_cfg.get("retry_failed", True)), retry_interval=int(scheduler_cfg.get("retry_interval", 3600)), ) ) def _refresh_scheduler(self) -> None: scheduler_cfg = self.app.config.get().get("scheduler", {}) update_time = self.config.get("update_time", "03:00") self._scheduler.config.time = scheduler_cfg.get("auto_update_time", update_time) self._scheduler.config.postpone_on_activity = bool( scheduler_cfg.get("postpone_on_activity", True) ) self._scheduler.config.max_downtime = int(scheduler_cfg.get("max_downtime", 120)) self._scheduler.config.retry_failed = bool(scheduler_cfg.get("retry_failed", True)) self._scheduler.config.retry_interval = int(scheduler_cfg.get("retry_interval", 3600)) async def check_updates(self) -> List[str]: return await self.app.updater.check_updates() async def apply_updates(self) -> str: start = time.monotonic() commits = await self.app.updater.get_commits_ahead() diff_stats = await self.app.updater.get_diff_stats() try: allowed = self.config.get("allowed_sources", []) if allowed: remote = await self.app.updater.get_remote_url() if not any(remote.startswith(src) for src in allowed): raise RuntimeError("Remote not allowed") if self.config.get("backup_before_update", True): self.app.backups.create("core") if self.config.get("verify_commits", False): commits = await self.app.updater.get_commits_ahead() allowed = self.app.config.get().get("security", {}).get("allowed_signers", []) for line in commits: commit = line.split(" ", 1)[0] if not await self.app.updater.verify_commit(commit, allowed_signers=allowed): raise RuntimeError("Commit verification failed") output = await self.app.updater.pull_updates() except Exception as exc: self.record_event( action="core_update", status="failed", meta={"error": str(exc)}, ) raise duration = time.monotonic() - start self.record_event( action="core_update", status="success", meta={ "commits": len(commits), "duration": duration, "lines_added": diff_stats.get("added", 0), "lines_deleted": diff_stats.get("deleted", 0), }, ) self._log_update(output) return output async def rollback(self, ref: str = "HEAD~1") -> str: output = await self.app.updater.rollback(ref) self.record_event(action="core_rollback", status="success", meta={"ref": ref}) self._log_update(f"rollback {ref}") return output async def notify_updates(self, commits: List[str]) -> None: if not commits: return buckets = parse_conventional(commits) summary = ["OverUB Update Available", format_changelog(buckets, inline=True)] await self.notifier.notify("\n".join(summary), event="update_available") async def _run_loop(self) -> None: while True: self.config = self.app.config.get().get("updates", {}) self._refresh_scheduler() interval = int(self.config.get("check_interval", 3600)) auto_update = bool(self.config.get("auto_update", False)) try: commits = await self.check_updates() if commits: if self.config.get("notify", True): await self.notify_updates(commits) if auto_update and self._scheduler.should_run(self.app.last_activity): output = await self.apply_updates() self._scheduler.mark_run() await self.notifier.notify(output or "Updated", event="update_completed") await self._auto_update_plugins() except Exception: logger.exception("Update check failed") self._scheduler.mark_failed() self.record_event(action="core_update", status="failed", meta={}) await self.notifier.notify("Update failed", event="update_failed") await asyncio.sleep(interval) def start(self) -> None: if self._task and not self._task.done(): return self._task = asyncio.create_task(self._run_loop()) def _log_update(self, output: str) -> None: path = self.app.root / "data" / "update_history.log" path.parent.mkdir(parents=True, exist_ok=True) stamp = datetime.utcnow().isoformat() with path.open("a", encoding="utf-8") as handle: handle.write(f"{stamp} {output}\n") def stats(self) -> dict[str, object]: history = self._read_history() core = [item for item in history if item.get("action") == "core_update"] core_success = [item for item in core if item.get("status") == "success"] modules = [item for item in history if item.get("action") == "module_update"] plugins = [item for item in history if item.get("action") == "plugin_update"] last = max((item.get("timestamp", "") for item in history), default="") avg_duration = 0.0 if core_success: avg_duration = sum(item.get("duration", 0.0) for item in core_success) / len(core_success) total_lines = sum(item.get("lines_added", 0) + item.get("lines_deleted", 0) for item in core) return { "core_updates": len(core), "core_success": len(core_success), "module_updates": len(modules), "plugin_updates": len(plugins), "last_update": last, "avg_update_time": round(avg_duration, 2), "total_lines_changed": total_lines, } def dashboard(self) -> str: stats = self.stats() core_total = stats["core_updates"] core_success = stats["core_success"] success_rate = 0 if core_total: success_rate = int((core_success / core_total) * 100) lines = [ "Update Statistics", f"Core Updates: {core_total} ({success_rate}% success)", f"Module Updates: {stats['module_updates']}", f"Plugin Updates: {stats['plugin_updates']}", f"Last Update: {stats['last_update'] or 'never'}", f"Average Update Time: {stats['avg_update_time']}s", f"Total Downloaded: {stats['total_lines_changed']} lines", ] return "\n".join(lines) async def _auto_update_plugins(self) -> None: plugin_cfg = self.app.config.get().get("plugins", {}) if not plugin_cfg.get("auto_update", False): return for name in self.app.plugins.list_installed(): cfg = self.app.config.get_plugin_config(name) if cfg.get("auto_update", True) is False: continue try: await self.app.plugins.update(name) self.record_event(action="plugin_update", status="success", meta={"name": name}) except Exception: logger.exception("Plugin update failed: %s", name) self.record_event(action="plugin_update", status="failed", meta={"name": name}) async def handle_webhook(self, payload: dict, signature: str = "") -> None: secret = self.config.get("gitea", {}).get("webhook_secret", "") if secret and signature: if not verify_signature(secret, json.dumps(payload).encode("utf-8"), signature): raise RuntimeError("Invalid webhook signature") info = parse_webhook(payload) if info.get("event") in {"push", "release", "tag"}: await self.apply_updates() async def stop(self) -> None: if self._task: self._task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._task def record_event(self, action: str, status: str, meta: dict | None = None) -> None: entry = { "timestamp": datetime.utcnow().isoformat(), "action": action, "status": status, } if meta: entry.update(meta) self._history_path.parent.mkdir(parents=True, exist_ok=True) with self._history_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(entry) + "\n") def _read_history(self) -> list[dict]: if not self._history_path.exists(): return [] entries = [] for line in self._history_path.read_text(encoding="utf-8").splitlines(): if not line.strip(): continue try: entries.append(json.loads(line)) except json.JSONDecodeError: continue return entries