56 lines
1.8 KiB
Python
56 lines
1.8 KiB
Python
import tarfile
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import List
|
|
|
|
from core.logger import get_logger
|
|
|
|
|
|
logger = get_logger("core.backup")
|
|
|
|
|
|
class BackupManager:
|
|
def __init__(self, root: Path) -> None:
|
|
self.root = root
|
|
self.backup_root = root / "backups"
|
|
self.backup_root.mkdir(parents=True, exist_ok=True)
|
|
|
|
def create(self, scope: str) -> Path:
|
|
timestamp = datetime.utcnow().strftime("%Y-%m-%d_%H%M%S")
|
|
if scope == "core":
|
|
sources = ["core", "config", "requirements.txt", "README.md", "__main__.py"]
|
|
dest_dir = self.backup_root / "core"
|
|
name = f"core_{timestamp}.tar.gz"
|
|
elif scope == "modules":
|
|
sources = ["modules"]
|
|
dest_dir = self.backup_root / "modules"
|
|
name = f"modules_{timestamp}.tar.gz"
|
|
elif scope == "plugins":
|
|
sources = ["plugins/external"]
|
|
dest_dir = self.backup_root / "plugins"
|
|
name = f"plugins_{timestamp}.tar.gz"
|
|
else:
|
|
raise ValueError("Unknown backup scope")
|
|
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
archive_path = dest_dir / name
|
|
|
|
with tarfile.open(archive_path, "w:gz") as tar:
|
|
for src in sources:
|
|
src_path = self.root / src
|
|
if src_path.exists():
|
|
tar.add(src_path, arcname=src)
|
|
logger.info("Backup created: %s", archive_path)
|
|
return archive_path
|
|
|
|
def list(self, scope: str) -> List[str]:
|
|
scope_dir = self.backup_root / scope
|
|
if not scope_dir.exists():
|
|
return []
|
|
return sorted([item.name for item in scope_dir.iterdir() if item.is_file()])
|
|
|
|
def delete(self, scope: str, name: str) -> None:
|
|
target = self.backup_root / scope / name
|
|
if target.exists():
|
|
target.unlink()
|