235 lines
8.6 KiB
Python
235 lines
8.6 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List, Optional
|
|
|
|
from core.changelog import format_changelog, parse_conventional
|
|
from core.logger import get_logger
|
|
|
|
|
|
logger = get_logger("core.updater")
|
|
|
|
|
|
@dataclass
|
|
class VersionInfo:
|
|
core: str
|
|
commit: str
|
|
short_commit: str
|
|
branch: str
|
|
remote: str
|
|
channel: str
|
|
dirty: bool
|
|
date: datetime
|
|
|
|
|
|
class UpdateManager:
|
|
def __init__(self, repo_path: Path, remote: str, branch: str) -> None:
|
|
self.repo_path = repo_path
|
|
self.remote = remote
|
|
self.branch = branch
|
|
self.gitea = None
|
|
|
|
async def check_updates(self) -> List[str]:
|
|
await self.fetch_updates()
|
|
log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"])
|
|
return [line for line in log.splitlines() if line.strip()]
|
|
|
|
async def fetch_updates(self) -> str:
|
|
return await self._run(["git", "fetch", self.remote])
|
|
|
|
async def pull_updates(self) -> str:
|
|
return await self._run(["git", "pull", self.remote, self.branch])
|
|
|
|
async def checkout_version(self, ref: str) -> str:
|
|
return await self._run(["git", "checkout", ref])
|
|
|
|
async def rollback(self, commit: str) -> str:
|
|
return await self._run(["git", "reset", "--hard", commit])
|
|
|
|
async def get_current_commit(self) -> str:
|
|
return (await self._run(["git", "rev-parse", "HEAD"])).strip()
|
|
|
|
async def get_current_branch(self) -> str:
|
|
return (await self._run(["git", "rev-parse", "--abbrev-ref", "HEAD"])).strip()
|
|
|
|
async def get_remote_url(self) -> str:
|
|
return (await self._run(["git", "remote", "get-url", self.remote])).strip()
|
|
|
|
async def list_tags(self) -> List[str]:
|
|
output = await self._run(["git", "tag"])
|
|
return [tag for tag in output.splitlines() if tag.strip()]
|
|
|
|
async def get_commits_ahead(self) -> List[str]:
|
|
log = await self._run(["git", "log", f"HEAD..{self.remote}/{self.branch}", "--oneline"])
|
|
return [line for line in log.splitlines() if line.strip()]
|
|
|
|
async def get_changelog(self, ref: str = "HEAD") -> str:
|
|
if self.gitea and ref != "HEAD":
|
|
owner_repo = await self._get_owner_repo()
|
|
if owner_repo:
|
|
owner, repo = owner_repo
|
|
release = await self.get_release(owner, repo, ref)
|
|
body = release.get("body") or release.get("note")
|
|
if body:
|
|
return body
|
|
output = await self._run(["git", "log", ref, "--oneline"])
|
|
commits = [line for line in output.splitlines() if line.strip()]
|
|
buckets = parse_conventional(commits)
|
|
return format_changelog(buckets)
|
|
|
|
async def get_changelog_since(self, ref: str) -> str:
|
|
output = await self._run(["git", "log", f"{ref}..HEAD", "--oneline"])
|
|
commits = [line for line in output.splitlines() if line.strip()]
|
|
buckets = parse_conventional(commits)
|
|
return format_changelog(buckets)
|
|
|
|
async def get_changelog_between(self, start: str, end: str) -> str:
|
|
output = await self._run(["git", "log", f"{start}..{end}", "--oneline"])
|
|
commits = [line for line in output.splitlines() if line.strip()]
|
|
buckets = parse_conventional(commits)
|
|
return format_changelog(buckets)
|
|
|
|
async def search_commits(self, keyword: str) -> str:
|
|
output = await self._run(["git", "log", "--oneline", "--grep", keyword])
|
|
commits = [line for line in output.splitlines() if line.strip()]
|
|
if not commits:
|
|
return "No matching commits"
|
|
buckets = parse_conventional(commits)
|
|
return format_changelog(buckets)
|
|
|
|
async def get_unreleased_changelog(self) -> str:
|
|
try:
|
|
last_tag = (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip()
|
|
except RuntimeError:
|
|
last_tag = ""
|
|
if not last_tag:
|
|
return await self.get_changelog("HEAD")
|
|
return await self.get_changelog_since(last_tag)
|
|
|
|
async def get_diff_stats(self) -> dict:
|
|
output = await self._run(["git", "diff", "--numstat", f"HEAD..{self.remote}/{self.branch}"])
|
|
added = 0
|
|
deleted = 0
|
|
for line in output.splitlines():
|
|
parts = line.split("\t")
|
|
if len(parts) < 2:
|
|
continue
|
|
add, remove = parts[0], parts[1]
|
|
if add.isdigit():
|
|
added += int(add)
|
|
if remove.isdigit():
|
|
deleted += int(remove)
|
|
return {"added": added, "deleted": deleted}
|
|
|
|
async def verify_commit(self, commit_hash: str, allowed_signers: Optional[list[str]] = None) -> bool:
|
|
try:
|
|
await self._run(["git", "verify-commit", commit_hash])
|
|
except RuntimeError:
|
|
return False
|
|
if allowed_signers:
|
|
signer = await self._get_signer(commit_hash)
|
|
return signer in allowed_signers
|
|
return True
|
|
|
|
async def get_releases(self, owner: str, repo: str) -> list[dict]:
|
|
if not self.gitea:
|
|
raise RuntimeError("Gitea client not configured")
|
|
return self.gitea.releases(owner, repo)
|
|
|
|
async def get_latest_release(self, owner: str, repo: str) -> dict:
|
|
releases = await self.get_releases(owner, repo)
|
|
return releases[0] if releases else {}
|
|
|
|
async def get_release(self, owner: str, repo: str, tag: str) -> dict:
|
|
releases = await self.get_releases(owner, repo)
|
|
for release in releases:
|
|
if release.get("tag_name") == tag:
|
|
return release
|
|
return {}
|
|
|
|
async def download_release(self, owner: str, repo: str, tag: str) -> dict:
|
|
release = await self.get_release(owner, repo, tag)
|
|
if not release:
|
|
raise RuntimeError("Release not found")
|
|
assets = release.get("assets", [])
|
|
if not assets:
|
|
return release
|
|
asset = assets[0]
|
|
url = asset.get("browser_download_url")
|
|
if not url:
|
|
return release
|
|
return {"release": release, "asset_url": url}
|
|
|
|
async def download_release_asset(self, url: str, dest: Path) -> Path:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self._sync_download, url, dest)
|
|
|
|
def _sync_download(self, url: str, dest: Path) -> Path:
|
|
from urllib import request
|
|
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
with request.urlopen(url) as response:
|
|
data = response.read()
|
|
dest.write_bytes(data)
|
|
return dest
|
|
|
|
async def get_version_info(self) -> VersionInfo:
|
|
commit = await self.get_current_commit()
|
|
branch = await self.get_current_branch()
|
|
remote = await self.get_remote_url()
|
|
core_version = await self._get_core_version()
|
|
short_commit = commit[:7]
|
|
dirty = bool((await self._run(["git", "status", "--porcelain"])).strip())
|
|
date = datetime.utcnow()
|
|
return VersionInfo(
|
|
core=core_version,
|
|
commit=commit,
|
|
short_commit=short_commit,
|
|
branch=branch,
|
|
remote=remote,
|
|
channel=branch,
|
|
dirty=dirty,
|
|
date=date,
|
|
)
|
|
|
|
async def _get_core_version(self) -> str:
|
|
try:
|
|
return (await self._run(["git", "describe", "--tags", "--abbrev=0"])).strip()
|
|
except RuntimeError:
|
|
return "unknown"
|
|
|
|
async def _run(self, cmd: List[str]) -> str:
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(None, self._sync_run, cmd)
|
|
|
|
def _sync_run(self, cmd: List[str]) -> str:
|
|
logger.debug("Running command: %s", " ".join(cmd))
|
|
result = subprocess.run(cmd, cwd=self.repo_path, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(result.stderr.strip() or "Command failed")
|
|
return result.stdout
|
|
|
|
async def _get_signer(self, commit_hash: str) -> str:
|
|
output = await self._run(["git", "log", "--format=%GF", "-n", "1", commit_hash])
|
|
return output.strip()
|
|
|
|
async def _get_owner_repo(self) -> Optional[tuple[str, str]]:
|
|
try:
|
|
remote = await self.get_remote_url()
|
|
except Exception:
|
|
return None
|
|
if remote.startswith("http"):
|
|
parts = remote.rstrip(".git").split("/")
|
|
if len(parts) >= 2:
|
|
return parts[-2], parts[-1]
|
|
if remote.startswith("git@") and ":" in remote:
|
|
path = remote.split(":", 1)[1].rstrip(".git")
|
|
parts = path.split("/")
|
|
if len(parts) >= 2:
|
|
return parts[-2], parts[-1]
|
|
return None
|