from core.module import Module class DeveloperModule(Module): name = "developer" version = "0.1.0" description = "Developer commands" async def on_load(self) -> None: builder = self.commands @builder.command( name="eval", description="Stub eval command", category="developer", usage=".eval ", permission="admin", ) async def eval_cmd(event, args): expr = " ".join(args) if not expr: await event.reply("Expression required") return scope = {"app": self.app} try: result = eval(expr, {"__builtins__": {}}, scope) except Exception as exc: await event.reply(f"Eval error: {exc}") return await event.reply(str(result)) @builder.command( name="exec", description="Stub exec command", category="developer", usage=".exec ", permission="admin", ) async def exec_cmd(event, args): code = " ".join(args) if not code: await event.reply("Code required") return scope = {"app": self.app} try: exec(code, {"__builtins__": {}}, scope) except Exception as exc: await event.reply(f"Exec error: {exc}") return await event.reply("Executed") @builder.command( name="format", description="Format JSON", category="developer", usage=".format ", permission="admin", ) async def format_cmd(event, args): import json text = " ".join(args) try: data = json.loads(text) except Exception: await event.reply("Invalid JSON") return await event.reply(json.dumps(data, indent=2, sort_keys=True)) @builder.command( name="regex", description="Test regex pattern", category="developer", usage=".regex ", permission="admin", ) async def regex_cmd(event, args): import re if len(args) < 2: await event.reply("Usage: .regex ") return pattern = args[0] text = " ".join(args[1:]) matches = re.findall(pattern, text) await event.reply(f"Matches: {len(matches)}") @builder.command( name="uuid", description="Generate UUID4", category="developer", usage=".uuid", permission="admin", ) async def uuid_cmd(event, args): import uuid await event.reply(str(uuid.uuid4())) @builder.command( name="update", description="Core update commands", category="developer", usage=".update ", permission="admin", ) async def update_cmd(event, args): if not args: await event.reply("Usage: .update ") return action = args[0] manager = self.app.updater if action == "check": commits = await manager.check_updates() await event.reply("\n".join(commits) if commits else "No updates") return if action == "now": output = await self.app.update_service.apply_updates() await event.reply(output or "Updated") return if action == "info": info = await manager.get_version_info() await event.reply(str(info)) return if action == "remote": remote = await manager.get_remote_url() await event.reply(remote) return if action == "fetch": output = await manager.fetch_updates() await event.reply(output or "Fetched") return if action == "release": if len(args) < 3: await event.reply("Usage: .update release ") return owner, repo = args[1].split("/", 1) tag = args[2] info = await manager.download_release(owner, repo, tag) asset_url = info.get("asset_url") if not asset_url: await event.reply("No assets found") return dest = self.app.root / "data" / "releases" / f"{repo}-{tag}.asset" path = await manager.download_release_asset(asset_url, dest) await event.reply(f"Downloaded {path.name}") return if action == "rollback": if len(args) < 2: await event.reply("Commit required") return output = await self.app.update_service.rollback(args[1]) await event.reply(output) return if action == "channel": if len(args) < 2: await event.reply("Branch required") return manager.branch = args[1] await event.reply(f"Channel set to {manager.branch}") return if action == "changelog": ref = args[1] if len(args) > 1 else "HEAD" output = await manager.get_changelog(ref) await event.reply(output) return if action == "history": path = self.app.root / "data" / "update_history.jsonl" if not path.exists(): await event.reply("No history") return await event.reply(path.read_text(encoding="utf-8")[-3500:]) return if action == "stats": await event.reply(self.app.update_service.dashboard()) return if action in {"emergency", "force", "critical"}: try: output = await self.app.update_service.apply_updates() except Exception as exc: await event.reply(f"Update failed: {exc}") return await event.reply(output or "Updated") return await event.reply("Unknown action") @builder.command( name="perf", description="Show performance stats", category="developer", usage=".perf", permission="admin", ) async def perf_cmd(event, args): from core.monitor import get_system_stats stats = get_system_stats() await event.reply(str(stats)) @builder.command( name="version", description="Show version info", category="developer", usage=".version [core|modules|plugins]", permission="admin", ) async def version_cmd(event, args): report = await self.app.versions.get_report() if args and args[0] == "modules": await event.reply(str(report.modules)) return if args and args[0] == "plugins": await event.reply(str(report.plugins)) return await event.reply(str(report)) @builder.command( name="changelog", description="Show changelog", category="developer", usage=".changelog [ref|full|search|since|between|unreleased]", permission="admin", ) async def changelog_cmd(event, args): if not args: output = await self.app.updater.get_changelog("HEAD") await event.reply(output) return action = args[0] if action == "full": output = await self.app.updater.get_changelog("HEAD") await event.reply(output) return if action == "search": if len(args) < 2: await event.reply("Usage: .changelog search ") return output = await self.app.updater.search_commits(" ".join(args[1:])) await event.reply(output) return if action == "since": if len(args) < 2: await event.reply("Usage: .changelog since ") return output = await self.app.updater.get_changelog_since(args[1]) await event.reply(output) return if action == "between": if len(args) < 3: await event.reply("Usage: .changelog between ") return output = await self.app.updater.get_changelog_between(args[1], args[2]) await event.reply(output) return if action == "unreleased": output = await self.app.updater.get_unreleased_changelog() await event.reply(output) return output = await self.app.updater.get_changelog(action) await event.reply(output) @builder.command( name="config", description="Reload configuration", category="developer", usage=".config reload", permission="admin", ) async def config_cmd(event, args): if not args or args[0] != "reload": await event.reply("Usage: .config reload") return self.app.config.reload() await event.reply("Config reloaded") @builder.command( name="logs", description="Show recent logs", category="developer", usage=".logs [module]", permission="admin", ) async def logs_cmd(event, args): import pathlib log_dir = pathlib.Path("data/logs") if args: log_file = log_dir / "modules" / f"{args[0].replace('.', '_')}.log" else: log_file = log_dir / "overub.log" if not log_file.exists(): await event.reply("Log not found") return text = log_file.read_text(encoding="utf-8") await event.reply(text[-3500:]) @builder.command( name="migrate", description="Manage migrations", category="developer", usage=".migrate [name|steps]", permission="admin", ) async def migrate_cmd(event, args): if not args: await event.reply("Usage: .migrate [name|steps]") return action = args[0] if action == "list": items = self.app.migrations.list_migrations() await event.reply("\n".join(items) if items else "No migrations") return if action == "validate": errors = self.app.migrations.validate() await event.reply("\n".join(errors) if errors else "Migrations OK") return if action == "apply": await self.app.migrations.apply(self.app) await event.reply("Migrations applied") return if action == "rollback": name = None steps = 1 if len(args) > 1: if args[1].isdigit(): steps = int(args[1]) else: name = args[1] rolled = await self.app.migrations.rollback(self.app, name=name, steps=steps) await event.reply("Rolled back: " + ", ".join(rolled) if rolled else "Nothing to roll back") return await event.reply("Unknown action")