Files
overub/core/updater.py
2025-12-21 17:12:32 +01:00

235 lines
8.6 KiB
Python

from __future__ import annotations
import asyncio
import subprocess
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import List, Optional
from core.changelog import format_changelog, parse_conventional
from core.logger import get_logger
logger = get_logger("core.updater")
@dataclass
class VersionInfo:
core: str
commit: str
short_commit: str
branch: str
remote: str
channel: str
dirty: bool
date: datetime
class UpdateManager:
def __init__(self, repo_path: Path, remote: str, branch: str) -> None:
self.repo_path = repo_path
self.remote = remote
self.branch = branch
self.gitea = None
async def check_updates(self) -> List[str]:
await self.fetch_updates()
log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"])
return [line for line in log.splitlines() if line.strip()]
async def fetch_updates(self) -> str:
return await self._run(["git", "fetch", self.remote])
async def pull_updates(self) -> str:
return await self._run(["git", "pull", self.remote, self.branch])
async def checkout_version(self, ref: str) -> str:
return await self._run(["git", "checkout", ref])
async def rollback(self, commit: str) -> str:
return await self._run(["git", "reset", "--hard", commit])
async def get_current_commit(self) -> str:
return (await self._run(["git", "rev-parse", "HEAD"])).strip()
async def get_current_branch(self) -> str:
return (await self._run(["git", "rev-parse", "--abbrev-ref", "HEAD"])).strip()
async def get_remote_url(self) -> str:
return (await self._run(["git", "remote", "get-url", self.remote])).strip()
async def list_tags(self) -> List[str]:
output = await self._run(["git", "tag"])
return [tag for tag in output.splitlines() if tag.strip()]
async def get_commits_ahead(self) -> List[str]:
log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"])
return [line for line in log.splitlines() if line.strip()]
async def get_changelog(self, ref: str = "HEAD") -> str:
if self.gitea and ref != "HEAD":
owner_repo = await self._get_owner_repo()
if owner_repo:
owner, repo = owner_repo
release = await self.get_release(owner, repo, ref)
body = release.get("body") or release.get("note")
if body:
return body
output = await self._run(["git", "log", ref, "--oneline"])
commits = [line for line in output.splitlines() if line.strip()]
buckets = parse_conventional(commits)
return format_changelog(buckets)
async def get_changelog_since(self, ref: str) -> str:
output = await self._run(["git", "log", f"{ref}..HEAD", "--oneline"])
commits = [line for line in output.splitlines() if line.strip()]
buckets = parse_conventional(commits)
return format_changelog(buckets)
async def get_changelog_between(self, start: str, end: str) -> str:
output = await self._run(["git", "log", f"{start}..{end}", "--oneline"])
commits = [line for line in output.splitlines() if line.strip()]
buckets = parse_conventional(commits)
return format_changelog(buckets)
async def search_commits(self, keyword: str) -> str:
output = await self._run(["git", "log", "--oneline", "--grep", keyword])
commits = [line for line in output.splitlines() if line.strip()]
if not commits:
return "No matching commits"
buckets = parse_conventional(commits)
return format_changelog(buckets)
async def get_unreleased_changelog(self) -> str:
try:
last_tag = (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip()
except RuntimeError:
last_tag = ""
if not last_tag:
return await self.get_changelog("HEAD")
return await self.get_changelog_since(last_tag)
async def get_diff_stats(self) -> dict:
output = await self._run(["git", "diff", "--numstat", f"HEAD..{self.remote}/{self.branch}"])
added = 0
deleted = 0
for line in output.splitlines():
parts = line.split("\t")
if len(parts) < 2:
continue
add, remove = parts[0], parts[1]
if add.isdigit():
added += int(add)
if remove.isdigit():
deleted += int(remove)
return {"added": added, "deleted": deleted}
async def verify_commit(self, commit_hash: str, allowed_signers: Optional[list[str]] = None) -> bool:
try:
await self._run(["git", "verify-commit", commit_hash])
except RuntimeError:
return False
if allowed_signers:
signer = await self._get_signer(commit_hash)
return signer in allowed_signers
return True
async def get_releases(self, owner: str, repo: str) -> list[dict]:
if not self.gitea:
raise RuntimeError("Gitea client not configured")
return self.gitea.releases(owner, repo)
async def get_latest_release(self, owner: str, repo: str) -> dict:
releases = await self.get_releases(owner, repo)
return releases[0] if releases else {}
async def get_release(self, owner: str, repo: str, tag: str) -> dict:
releases = await self.get_releases(owner, repo)
for release in releases:
if release.get("tag_name") == tag:
return release
return {}
async def download_release(self, owner: str, repo: str, tag: str) -> dict:
release = await self.get_release(owner, repo, tag)
if not release:
raise RuntimeError("Release not found")
assets = release.get("assets", [])
if not assets:
return release
asset = assets[0]
url = asset.get("browser_download_url")
if not url:
return release
return {"release": release, "asset_url": url}
async def download_release_asset(self, url: str, dest: Path) -> Path:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_download, url, dest)
def _sync_download(self, url: str, dest: Path) -> Path:
from urllib import request
dest.parent.mkdir(parents=True, exist_ok=True)
with request.urlopen(url) as response:
data = response.read()
dest.write_bytes(data)
return dest
async def get_version_info(self) -> VersionInfo:
commit = await self.get_current_commit()
branch = await self.get_current_branch()
remote = await self.get_remote_url()
core_version = await self._get_core_version()
short_commit = commit[:7]
dirty = bool((await self._run(["git", "status", "--porcelain"])).strip())
date = datetime.utcnow()
return VersionInfo(
core=core_version,
commit=commit,
short_commit=short_commit,
branch=branch,
remote=remote,
channel=branch,
dirty=dirty,
date=date,
)
async def _get_core_version(self) -> str:
try:
return (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip()
except RuntimeError:
return "unknown"
async def _run(self, cmd: List[str]) -> str:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, self._sync_run, cmd)
def _sync_run(self, cmd: List[str]) -> str:
logger.debug("Running command: %s", " ".join(cmd))
result = subprocess.run(cmd, cwd=self.repo_path, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr.strip() or "Command failed")
return result.stdout
async def _get_signer(self, commit_hash: str) -> str:
output = await self._run(["git", "log", "--format=%GF", "-n", "1", commit_hash])
return output.strip()
async def _get_owner_repo(self) -> Optional[tuple[str, str]]:
try:
remote = await self.get_remote_url()
except Exception:
return None
if remote.startswith("http"):
parts = remote.rstrip(".git").split("/")
if len(parts) >= 2:
return parts[-2], parts[-1]
if remote.startswith("git@") and ":" in remote:
path = remote.split(":", 1)[1].rstrip(".git")
parts = path.split("/")
if len(parts) >= 2:
return parts[-2], parts[-1]
return None