Files
overub/modules/utility/__init__.py
2025-12-21 17:12:32 +01:00

466 lines
19 KiB
Python

import asyncio
from core.commands import ArgumentSpec
from core.module import Module
class UtilityModule(Module):
name = "utility"
version = "0.1.0"
description = "Utility commands"
async def on_load(self) -> None:
builder = self.commands
await self.app.database.execute(
"CREATE TABLE IF NOT EXISTS utility_notes (name TEXT PRIMARY KEY, value TEXT)"
)
async def on_callback(evt):
event = evt.payload.get("event")
if not event or not hasattr(event, "data"):
return
data = event.data.decode("utf-8")
if not data.startswith("overub_cfg:"):
return
_, action, name = data.split(":", 2)
cfg = self.app.config.get_plugin_config(name)
if action == "toggle":
cfg["enabled"] = not cfg.get("enabled", True)
self.app.config.set_plugin_config(name, cfg)
self.app.config.save()
await event.answer("Toggled")
if action == "auto":
cfg["auto_update"] = not cfg.get("auto_update", True)
self.app.config.set_plugin_config(name, cfg)
self.app.config.save()
await event.answer("Auto-update toggled")
self.app.events.on("on_callback_query", on_callback)
@builder.command(
name="calc",
description="Simple calculator",
category="utility",
usage=".calc <expression>",
example=".calc 2+2",
)
async def calc_cmd(event, args):
expression = " ".join(args)
if not expression:
await event.reply("Usage: .calc <expression>")
return
try:
result = eval(expression, {"__builtins__": {}}, {})
except Exception:
await event.reply("Invalid expression")
return
await event.reply(str(result))
@builder.command(
name="help",
description="Show command help",
category="utility",
usage=".help [command]",
example=".help calc",
)
async def help_cmd(event, args):
name = args[0] if args else None
await event.reply(self.app.commands.help_text(name))
@builder.command(
name="note",
description="Manage notes",
category="utility",
usage=".note <add|get|list|del> [name] [value]",
)
async def note_cmd(event, args):
if not args:
await event.reply("Usage: .note <add|get|list|del> [name] [value]")
return
action = args[0]
if action == "list":
rows = await self.app.database.fetchall("SELECT name FROM utility_notes")
names = [row["name"] for row in rows]
await event.reply(", ".join(names) if names else "No notes")
return
if len(args) < 2:
await event.reply("Note name required")
return
name = args[1]
if action == "add":
value = " ".join(args[2:])
await self.app.database.execute(
"INSERT OR REPLACE INTO utility_notes (name, value) VALUES (?, ?)",
(name, value),
)
await event.reply("Saved")
return
if action == "get":
row = await self.app.database.fetchone(
"SELECT value FROM utility_notes WHERE name=?",
(name,),
)
await event.reply(row["value"] if row else "Not found")
return
if action == "del":
await self.app.database.execute(
"DELETE FROM utility_notes WHERE name=?",
(name,),
)
await event.reply("Deleted")
return
await event.reply("Unknown action")
@builder.command(
name="remind",
description="Set a reminder in seconds",
category="utility",
usage=".remind <seconds> <text>",
arguments=[
ArgumentSpec("seconds", int, True),
ArgumentSpec("text", str, True, variadic=True),
],
)
async def remind_cmd(event, args):
if len(args) < 2:
await event.reply("Usage: .remind <seconds> <text>")
return
try:
seconds = int(args[0])
except ValueError:
await event.reply("Invalid seconds")
return
text = " ".join(args[1:])
await event.reply(f"Reminder set for {seconds}s")
await asyncio.sleep(seconds)
await event.reply(f"Reminder: {text}")
@builder.command(
name="convert",
description="Stub unit conversion",
category="utility",
usage=".convert <value> <unit>",
arguments=[
ArgumentSpec("value", float, True),
ArgumentSpec("unit", str, True),
],
)
async def convert_cmd(event, args):
value = float(args[0]) if args else 0.0
unit = args[1].lower() if len(args) > 1 else ""
conversions = {
"km-mi": value * 0.621371,
"mi-km": value / 0.621371,
"c-f": (value * 9 / 5) + 32,
"f-c": (value - 32) * 5 / 9,
}
if unit not in conversions:
await event.reply("Units: km-mi, mi-km, c-f, f-c")
return
await event.reply(str(round(conversions[unit], 4)))
@builder.command(
name="weather",
description="Stub weather",
category="utility",
usage=".weather <city>",
arguments=[ArgumentSpec("city", str, True, variadic=True)],
)
async def weather_cmd(event, args):
city = " ".join(args)
geo_url = f"https://geocoding-api.open-meteo.com/v1/search?name={city}&count=1"
geo = await self.app.http.get_json(geo_url)
if not geo or not geo.get("results"):
await event.reply("Location not found")
return
loc = geo["results"][0]
lat = loc["latitude"]
lon = loc["longitude"]
weather_url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true"
data = await self.app.http.get_json(weather_url)
if not data or "current_weather" not in data:
await event.reply("Weather unavailable")
return
current = data["current_weather"]
await event.reply(
f"{loc['name']}: {current['temperature']}°C, wind {current['windspeed']} km/h"
)
@builder.command(
name="crypto",
description="Stub crypto prices",
category="utility",
usage=".crypto <symbol>",
arguments=[ArgumentSpec("symbol", str, True)],
)
async def crypto_cmd(event, args):
symbol = args[0].lower() if args else ""
map_ids = {
"btc": "bitcoin",
"eth": "ethereum",
"bnb": "binancecoin",
"sol": "solana",
}
coin_id = map_ids.get(symbol)
if not coin_id:
await event.reply("Symbols: btc, eth, bnb, sol")
return
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
data = await self.app.http.get_json(url)
if not data or coin_id not in data:
await event.reply("Price unavailable")
return
price = data[coin_id]["usd"]
await event.reply(f"{symbol.upper()}: ${price}")
@builder.command(
name="plugin",
description="Manage plugins",
category="utility",
usage=".plugin <list|search|install|uninstall|update|rollback|info|config|configui|remote|fetch|reload|enable|disable> [name]",
example=".plugin list",
)
async def plugin_cmd(event, args):
if not args:
await event.reply("Usage: .plugin <list|search|install|uninstall|update|rollback|info|config|configui|remote|fetch|reload|enable|disable> [name]")
return
action = args[0]
name = args[1] if len(args) > 1 else None
if action == "list":
installed = self.app.plugins.list_installed()
loaded = set(self.app.plugins.list())
if installed:
summary = []
for plugin in installed:
state = "loaded" if plugin in loaded else "unloaded"
summary.append(f"{plugin} ({state})")
await event.reply("Installed plugins: " + ", ".join(summary))
else:
await event.reply("No plugins installed")
return
if action == "search":
query = " ".join(args[1:])
if not query:
await event.reply("Query required")
return
try:
results = await self.app.plugins.search(query)
except Exception as exc:
await event.reply(f"Search failed: {exc}")
return
await event.reply(results or "No results")
return
if action == "install":
repo = " ".join(args[1:])
if not repo:
await event.reply("Repo required")
return
try:
installed = await self.app.plugins.install(repo)
except Exception as exc:
self.app.update_service.record_event(
action="plugin_install",
status="failed",
meta={"repo": repo},
)
await event.reply(f"Install failed: {exc}")
return
self.app.update_service.record_event(
action="plugin_install",
status="success",
meta={"name": installed},
)
await event.reply(f"Installed {installed}")
return
if action == "uninstall":
if not name:
await event.reply("Plugin name required")
return
await self.app.plugins.uninstall(name)
self.app.update_service.record_event(
action="plugin_uninstall",
status="success",
meta={"name": name},
)
await event.reply(f"Uninstalled {name}")
return
if action == "update":
target = name or "all"
if target == "check":
updates = []
for item in self.app.plugins.list_installed():
try:
await self.app.plugins.fetch(item)
updates.append(item)
except Exception:
continue
await event.reply("Checked updates for: " + ", ".join(updates) if updates else "No plugins checked")
return
if target == "exclude":
if len(args) < 3:
await event.reply("Plugin name required")
return
cfg = self.app.config.get_plugin_config(args[2])
cfg["auto_update"] = False
self.app.config.set_plugin_config(args[2], cfg)
self.app.config.save()
await event.reply(f"Excluded {args[2]} from auto-updates")
return
if target == "all":
updated = []
if not self.app.plugins.plugin_path.exists():
await event.reply("No plugins directory")
return
for item in self.app.plugins.plugin_path.iterdir():
if item.is_dir():
try:
await self.app.plugins.update(item.name)
updated.append(item.name)
except Exception as exc:
self.app.update_service.record_event(
action="plugin_update",
status="failed",
meta={"name": item.name},
)
await event.reply(f"Update failed for {item.name}: {exc}")
return
for plugin_name in updated:
self.app.update_service.record_event(
action="plugin_update",
status="success",
meta={"name": plugin_name},
)
await event.reply("Updated: " + ", ".join(updated) if updated else "No plugins updated")
return
try:
await self.app.plugins.update(target)
except Exception as exc:
self.app.update_service.record_event(
action="plugin_update",
status="failed",
meta={"name": target},
)
await event.reply(f"Update failed: {exc}")
return
self.app.update_service.record_event(
action="plugin_update",
status="success",
meta={"name": target},
)
await event.reply(f"Updated {target}")
return
if action == "rollback":
if not name:
await event.reply("Plugin name required")
return
ref = args[2] if len(args) > 2 else "HEAD~1"
try:
await self.app.plugins.rollback(name, ref)
except Exception as exc:
self.app.update_service.record_event(
action="plugin_rollback",
status="failed",
meta={"name": name, "ref": ref},
)
await event.reply(f"Rollback failed: {exc}")
return
self.app.update_service.record_event(
action="plugin_rollback",
status="success",
meta={"name": name, "ref": ref},
)
await event.reply(f"Rolled back {name}")
return
if action == "info":
if not name:
await event.reply("Plugin name required")
return
info = await self.app.plugins.info(name)
await event.reply(str(info))
return
if action == "config":
if not name:
await event.reply("Plugin name required")
return
if len(args) == 2:
from core.config_ui import render_plugin_config
cfg = self.app.config.get_plugin_config(name)
await event.reply(render_plugin_config(name, cfg))
return
if len(args) >= 4:
key = args[2]
value = " ".join(args[3:])
cfg = self.app.config.get_plugin_config(name)
cfg.setdefault("settings", {})[key] = value
self.app.config.set_plugin_config(name, cfg)
self.app.config.save()
await event.reply("Config updated")
return
await event.reply("Usage: .plugin config <name> [key value]")
return
if action == "configui":
if not name:
await event.reply("Plugin name required")
return
try:
from telethon import Button
except Exception:
await event.reply("Buttons unavailable")
return
cfg = self.app.config.get_plugin_config(name)
status = "enabled" if cfg.get("enabled", True) else "disabled"
auto = "auto" if cfg.get("auto_update", True) else "manual"
buttons = [
[Button.inline(f"Toggle ({status})", data=f"overub_cfg:toggle:{name}")],
[Button.inline(f"Auto-update ({auto})", data=f"overub_cfg:auto:{name}")],
]
await event.reply(f"Config for {name}", buttons=buttons)
return
if action == "remote":
if not name:
await event.reply("Plugin name required")
return
try:
remote = await self.app.plugins.remote(name)
except Exception as exc:
await event.reply(f"Remote failed: {exc}")
return
await event.reply(remote)
return
if action == "fetch":
if not name:
await event.reply("Plugin name required")
return
try:
output = await self.app.plugins.fetch(name)
except Exception as exc:
await event.reply(f"Fetch failed: {exc}")
return
await event.reply(output or "Fetched")
return
if not name:
await event.reply("Plugin name required")
return
if action == "reload":
await self.app.plugins.reload(name)
await event.reply(f"Reloaded {name}")
return
if action == "enable":
await self.app.plugins.enable(name)
cfg = self.app.config.get_plugin_config(name)
cfg["enabled"] = True
self.app.config.set_plugin_config(name, cfg)
self.app.config.save()
await event.reply(f"Enabled {name}")
return
if action == "disable":
await self.app.plugins.disable(name)
cfg = self.app.config.get_plugin_config(name)
cfg["enabled"] = False
self.app.config.set_plugin_config(name, cfg)
self.app.config.save()
await event.reply(f"Disabled {name}")
return
await event.reply("Unknown action")