from __future__ import annotations import asyncio import subprocess from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import List, Optional from core.changelog import format_changelog, parse_conventional from core.logger import get_logger logger = get_logger("core.updater") @dataclass class VersionInfo: core: str commit: str short_commit: str branch: str remote: str channel: str dirty: bool date: datetime class UpdateManager: def __init__(self, repo_path: Path, remote: str, branch: str) -> None: self.repo_path = repo_path self.remote = remote self.branch = branch self.gitea = None async def check_updates(self) -> List[str]: await self.fetch_updates() log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"]) return [line for line in log.splitlines() if line.strip()] async def fetch_updates(self) -> str: return await self._run(["git", "fetch", self.remote]) async def pull_updates(self) -> str: return await self._run(["git", "pull", self.remote, self.branch]) async def checkout_version(self, ref: str) -> str: return await self._run(["git", "checkout", ref]) async def rollback(self, commit: str) -> str: return await self._run(["git", "reset", "--hard", commit]) async def get_current_commit(self) -> str: return (await self._run(["git", "rev-parse", "HEAD"])).strip() async def get_current_branch(self) -> str: return (await self._run(["git", "rev-parse", "--abbrev-ref", "HEAD"])).strip() async def get_remote_url(self) -> str: return (await self._run(["git", "remote", "get-url", self.remote])).strip() async def list_tags(self) -> List[str]: output = await self._run(["git", "tag"]) return [tag for tag in output.splitlines() if tag.strip()] async def get_commits_ahead(self) -> List[str]: log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"]) return [line for line in log.splitlines() if line.strip()] async def get_changelog(self, ref: str = "HEAD") -> str: if self.gitea and ref != "HEAD": owner_repo = await self._get_owner_repo() if owner_repo: owner, repo = owner_repo release = await self.get_release(owner, repo, ref) body = release.get("body") or release.get("note") if body: return body output = await self._run(["git", "log", ref, "--oneline"]) commits = [line for line in output.splitlines() if line.strip()] buckets = parse_conventional(commits) return format_changelog(buckets) async def get_changelog_since(self, ref: str) -> str: output = await self._run(["git", "log", f"{ref}..HEAD", "--oneline"]) commits = [line for line in output.splitlines() if line.strip()] buckets = parse_conventional(commits) return format_changelog(buckets) async def get_changelog_between(self, start: str, end: str) -> str: output = await self._run(["git", "log", f"{start}..{end}", "--oneline"]) commits = [line for line in output.splitlines() if line.strip()] buckets = parse_conventional(commits) return format_changelog(buckets) async def search_commits(self, keyword: str) -> str: output = await self._run(["git", "log", "--oneline", "--grep", keyword]) commits = [line for line in output.splitlines() if line.strip()] if not commits: return "No matching commits" buckets = parse_conventional(commits) return format_changelog(buckets) async def get_unreleased_changelog(self) -> str: try: last_tag = (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip() except RuntimeError: last_tag = "" if not last_tag: return await self.get_changelog("HEAD") return await self.get_changelog_since(last_tag) async def get_diff_stats(self) -> dict: output = await self._run(["git", "diff", "--numstat", f"HEAD..{self.remote}/{self.branch}"]) added = 0 deleted = 0 for line in output.splitlines(): parts = line.split("\t") if len(parts) < 2: continue add, remove = parts[0], parts[1] if add.isdigit(): added += int(add) if remove.isdigit(): deleted += int(remove) return {"added": added, "deleted": deleted} async def verify_commit(self, commit_hash: str, allowed_signers: Optional[list[str]] = None) -> bool: try: await self._run(["git", "verify-commit", commit_hash]) except RuntimeError: return False if allowed_signers: signer = await self._get_signer(commit_hash) return signer in allowed_signers return True async def get_releases(self, owner: str, repo: str) -> list[dict]: if not self.gitea: raise RuntimeError("Gitea client not configured") return self.gitea.releases(owner, repo) async def get_latest_release(self, owner: str, repo: str) -> dict: releases = await self.get_releases(owner, repo) return releases[0] if releases else {} async def get_release(self, owner: str, repo: str, tag: str) -> dict: releases = await self.get_releases(owner, repo) for release in releases: if release.get("tag_name") == tag: return release return {} async def download_release(self, owner: str, repo: str, tag: str) -> dict: release = await self.get_release(owner, repo, tag) if not release: raise RuntimeError("Release not found") assets = release.get("assets", []) if not assets: return release asset = assets[0] url = asset.get("browser_download_url") if not url: return release return {"release": release, "asset_url": url} async def download_release_asset(self, url: str, dest: Path) -> Path: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._sync_download, url, dest) def _sync_download(self, url: str, dest: Path) -> Path: from urllib import request dest.parent.mkdir(parents=True, exist_ok=True) with request.urlopen(url) as response: data = response.read() dest.write_bytes(data) return dest async def get_version_info(self) -> VersionInfo: commit = await self.get_current_commit() branch = await self.get_current_branch() remote = await self.get_remote_url() core_version = await self._get_core_version() short_commit = commit[:7] dirty = bool((await self._run(["git", "status", "--porcelain"])).strip()) date = datetime.utcnow() return VersionInfo( core=core_version, commit=commit, short_commit=short_commit, branch=branch, remote=remote, channel=branch, dirty=dirty, date=date, ) async def _get_core_version(self) -> str: try: return (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip() except RuntimeError: return "unknown" async def _run(self, cmd: List[str]) -> str: loop = asyncio.get_event_loop() return await loop.run_in_executor(None, self._sync_run, cmd) def _sync_run(self, cmd: List[str]) -> str: logger.debug("Running command: %s", " ".join(cmd)) result = subprocess.run(cmd, cwd=self.repo_path, capture_output=True, text=True) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "Command failed") return result.stdout async def _get_signer(self, commit_hash: str) -> str: output = await self._run(["git", "log", "--format=%GF", "-n", "1", commit_hash]) return output.strip() async def _get_owner_repo(self) -> Optional[tuple[str, str]]: try: remote = await self.get_remote_url() except Exception: return None if remote.startswith("http"): parts = remote.rstrip(".git").split("/") if len(parts) >= 2: return parts[-2], parts[-1] if remote.startswith("git@") and ":" in remote: path = remote.split(":", 1)[1].rstrip(".git") parts = path.split("/") if len(parts) >= 2: return parts[-2], parts[-1] return None