466 lines
19 KiB
Python
466 lines
19 KiB
Python
import asyncio
|
|
|
|
from core.commands import ArgumentSpec
|
|
from core.module import Module
|
|
|
|
|
|
class UtilityModule(Module):
|
|
name = "utility"
|
|
version = "0.1.0"
|
|
description = "Utility commands"
|
|
|
|
async def on_load(self) -> None:
|
|
builder = self.commands
|
|
await self.app.database.execute(
|
|
"CREATE TABLE IF NOT EXISTS utility_notes (name TEXT PRIMARY KEY, value TEXT)"
|
|
)
|
|
|
|
async def on_callback(evt):
|
|
event = evt.payload.get("event")
|
|
if not event or not hasattr(event, "data"):
|
|
return
|
|
data = event.data.decode("utf-8")
|
|
if not data.startswith("overub_cfg:"):
|
|
return
|
|
_, action, name = data.split(":", 2)
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
if action == "toggle":
|
|
cfg["enabled"] = not cfg.get("enabled", True)
|
|
self.app.config.set_plugin_config(name, cfg)
|
|
self.app.config.save()
|
|
await event.answer("Toggled")
|
|
if action == "auto":
|
|
cfg["auto_update"] = not cfg.get("auto_update", True)
|
|
self.app.config.set_plugin_config(name, cfg)
|
|
self.app.config.save()
|
|
await event.answer("Auto-update toggled")
|
|
|
|
self.app.events.on("on_callback_query", on_callback)
|
|
|
|
@builder.command(
|
|
name="calc",
|
|
description="Simple calculator",
|
|
category="utility",
|
|
usage=".calc <expression>",
|
|
example=".calc 2+2",
|
|
)
|
|
async def calc_cmd(event, args):
|
|
expression = " ".join(args)
|
|
if not expression:
|
|
await event.reply("Usage: .calc <expression>")
|
|
return
|
|
try:
|
|
result = eval(expression, {"__builtins__": {}}, {})
|
|
except Exception:
|
|
await event.reply("Invalid expression")
|
|
return
|
|
await event.reply(str(result))
|
|
|
|
@builder.command(
|
|
name="help",
|
|
description="Show command help",
|
|
category="utility",
|
|
usage=".help [command]",
|
|
example=".help calc",
|
|
)
|
|
async def help_cmd(event, args):
|
|
name = args[0] if args else None
|
|
await event.reply(self.app.commands.help_text(name))
|
|
|
|
@builder.command(
|
|
name="note",
|
|
description="Manage notes",
|
|
category="utility",
|
|
usage=".note <add|get|list|del> [name] [value]",
|
|
)
|
|
async def note_cmd(event, args):
|
|
if not args:
|
|
await event.reply("Usage: .note <add|get|list|del> [name] [value]")
|
|
return
|
|
action = args[0]
|
|
if action == "list":
|
|
rows = await self.app.database.fetchall("SELECT name FROM utility_notes")
|
|
names = [row["name"] for row in rows]
|
|
await event.reply(", ".join(names) if names else "No notes")
|
|
return
|
|
if len(args) < 2:
|
|
await event.reply("Note name required")
|
|
return
|
|
name = args[1]
|
|
if action == "add":
|
|
value = " ".join(args[2:])
|
|
await self.app.database.execute(
|
|
"INSERT OR REPLACE INTO utility_notes (name, value) VALUES (?, ?)",
|
|
(name, value),
|
|
)
|
|
await event.reply("Saved")
|
|
return
|
|
if action == "get":
|
|
row = await self.app.database.fetchone(
|
|
"SELECT value FROM utility_notes WHERE name=?",
|
|
(name,),
|
|
)
|
|
await event.reply(row["value"] if row else "Not found")
|
|
return
|
|
if action == "del":
|
|
await self.app.database.execute(
|
|
"DELETE FROM utility_notes WHERE name=?",
|
|
(name,),
|
|
)
|
|
await event.reply("Deleted")
|
|
return
|
|
await event.reply("Unknown action")
|
|
|
|
@builder.command(
|
|
name="remind",
|
|
description="Set a reminder in seconds",
|
|
category="utility",
|
|
usage=".remind <seconds> <text>",
|
|
arguments=[
|
|
ArgumentSpec("seconds", int, True),
|
|
ArgumentSpec("text", str, True, variadic=True),
|
|
],
|
|
)
|
|
async def remind_cmd(event, args):
|
|
if len(args) < 2:
|
|
await event.reply("Usage: .remind <seconds> <text>")
|
|
return
|
|
try:
|
|
seconds = int(args[0])
|
|
except ValueError:
|
|
await event.reply("Invalid seconds")
|
|
return
|
|
text = " ".join(args[1:])
|
|
await event.reply(f"Reminder set for {seconds}s")
|
|
await asyncio.sleep(seconds)
|
|
await event.reply(f"Reminder: {text}")
|
|
|
|
@builder.command(
|
|
name="convert",
|
|
description="Stub unit conversion",
|
|
category="utility",
|
|
usage=".convert <value> <unit>",
|
|
arguments=[
|
|
ArgumentSpec("value", float, True),
|
|
ArgumentSpec("unit", str, True),
|
|
],
|
|
)
|
|
async def convert_cmd(event, args):
|
|
value = float(args[0]) if args else 0.0
|
|
unit = args[1].lower() if len(args) > 1 else ""
|
|
conversions = {
|
|
"km-mi": value * 0.621371,
|
|
"mi-km": value / 0.621371,
|
|
"c-f": (value * 9 / 5) + 32,
|
|
"f-c": (value - 32) * 5 / 9,
|
|
}
|
|
if unit not in conversions:
|
|
await event.reply("Units: km-mi, mi-km, c-f, f-c")
|
|
return
|
|
await event.reply(str(round(conversions[unit], 4)))
|
|
|
|
@builder.command(
|
|
name="weather",
|
|
description="Stub weather",
|
|
category="utility",
|
|
usage=".weather <city>",
|
|
arguments=[ArgumentSpec("city", str, True, variadic=True)],
|
|
)
|
|
async def weather_cmd(event, args):
|
|
city = " ".join(args)
|
|
geo_url = f"https://geocoding-api.open-meteo.com/v1/search?name={city}&count=1"
|
|
geo = await self.app.http.get_json(geo_url)
|
|
if not geo or not geo.get("results"):
|
|
await event.reply("Location not found")
|
|
return
|
|
loc = geo["results"][0]
|
|
lat = loc["latitude"]
|
|
lon = loc["longitude"]
|
|
weather_url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}¤t_weather=true"
|
|
data = await self.app.http.get_json(weather_url)
|
|
if not data or "current_weather" not in data:
|
|
await event.reply("Weather unavailable")
|
|
return
|
|
current = data["current_weather"]
|
|
await event.reply(
|
|
f"{loc['name']}: {current['temperature']}°C, wind {current['windspeed']} km/h"
|
|
)
|
|
|
|
@builder.command(
|
|
name="crypto",
|
|
description="Stub crypto prices",
|
|
category="utility",
|
|
usage=".crypto <symbol>",
|
|
arguments=[ArgumentSpec("symbol", str, True)],
|
|
)
|
|
async def crypto_cmd(event, args):
|
|
symbol = args[0].lower() if args else ""
|
|
map_ids = {
|
|
"btc": "bitcoin",
|
|
"eth": "ethereum",
|
|
"bnb": "binancecoin",
|
|
"sol": "solana",
|
|
}
|
|
coin_id = map_ids.get(symbol)
|
|
if not coin_id:
|
|
await event.reply("Symbols: btc, eth, bnb, sol")
|
|
return
|
|
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
|
|
data = await self.app.http.get_json(url)
|
|
if not data or coin_id not in data:
|
|
await event.reply("Price unavailable")
|
|
return
|
|
price = data[coin_id]["usd"]
|
|
await event.reply(f"{symbol.upper()}: ${price}")
|
|
|
|
@builder.command(
|
|
name="plugin",
|
|
description="Manage plugins",
|
|
category="utility",
|
|
usage=".plugin <list|search|install|uninstall|update|rollback|info|config|configui|remote|fetch|reload|enable|disable> [name]",
|
|
example=".plugin list",
|
|
)
|
|
async def plugin_cmd(event, args):
|
|
if not args:
|
|
await event.reply("Usage: .plugin <list|search|install|uninstall|update|rollback|info|config|configui|remote|fetch|reload|enable|disable> [name]")
|
|
return
|
|
action = args[0]
|
|
name = args[1] if len(args) > 1 else None
|
|
if action == "list":
|
|
installed = self.app.plugins.list_installed()
|
|
loaded = set(self.app.plugins.list())
|
|
if installed:
|
|
summary = []
|
|
for plugin in installed:
|
|
state = "loaded" if plugin in loaded else "unloaded"
|
|
summary.append(f"{plugin} ({state})")
|
|
await event.reply("Installed plugins: " + ", ".join(summary))
|
|
else:
|
|
await event.reply("No plugins installed")
|
|
return
|
|
if action == "search":
|
|
query = " ".join(args[1:])
|
|
if not query:
|
|
await event.reply("Query required")
|
|
return
|
|
try:
|
|
results = await self.app.plugins.search(query)
|
|
except Exception as exc:
|
|
await event.reply(f"Search failed: {exc}")
|
|
return
|
|
await event.reply(results or "No results")
|
|
return
|
|
if action == "install":
|
|
repo = " ".join(args[1:])
|
|
if not repo:
|
|
await event.reply("Repo required")
|
|
return
|
|
try:
|
|
installed = await self.app.plugins.install(repo)
|
|
except Exception as exc:
|
|
self.app.update_service.record_event(
|
|
action="plugin_install",
|
|
status="failed",
|
|
meta={"repo": repo},
|
|
)
|
|
await event.reply(f"Install failed: {exc}")
|
|
return
|
|
self.app.update_service.record_event(
|
|
action="plugin_install",
|
|
status="success",
|
|
meta={"name": installed},
|
|
)
|
|
await event.reply(f"Installed {installed}")
|
|
return
|
|
if action == "uninstall":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
await self.app.plugins.uninstall(name)
|
|
self.app.update_service.record_event(
|
|
action="plugin_uninstall",
|
|
status="success",
|
|
meta={"name": name},
|
|
)
|
|
await event.reply(f"Uninstalled {name}")
|
|
return
|
|
if action == "update":
|
|
target = name or "all"
|
|
if target == "check":
|
|
updates = []
|
|
for item in self.app.plugins.list_installed():
|
|
try:
|
|
await self.app.plugins.fetch(item)
|
|
updates.append(item)
|
|
except Exception:
|
|
continue
|
|
await event.reply("Checked updates for: " + ", ".join(updates) if updates else "No plugins checked")
|
|
return
|
|
if target == "exclude":
|
|
if len(args) < 3:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
cfg = self.app.config.get_plugin_config(args[2])
|
|
cfg["auto_update"] = False
|
|
self.app.config.set_plugin_config(args[2], cfg)
|
|
self.app.config.save()
|
|
await event.reply(f"Excluded {args[2]} from auto-updates")
|
|
return
|
|
if target == "all":
|
|
updated = []
|
|
if not self.app.plugins.plugin_path.exists():
|
|
await event.reply("No plugins directory")
|
|
return
|
|
for item in self.app.plugins.plugin_path.iterdir():
|
|
if item.is_dir():
|
|
try:
|
|
await self.app.plugins.update(item.name)
|
|
updated.append(item.name)
|
|
except Exception as exc:
|
|
self.app.update_service.record_event(
|
|
action="plugin_update",
|
|
status="failed",
|
|
meta={"name": item.name},
|
|
)
|
|
await event.reply(f"Update failed for {item.name}: {exc}")
|
|
return
|
|
for plugin_name in updated:
|
|
self.app.update_service.record_event(
|
|
action="plugin_update",
|
|
status="success",
|
|
meta={"name": plugin_name},
|
|
)
|
|
await event.reply("Updated: " + ", ".join(updated) if updated else "No plugins updated")
|
|
return
|
|
try:
|
|
await self.app.plugins.update(target)
|
|
except Exception as exc:
|
|
self.app.update_service.record_event(
|
|
action="plugin_update",
|
|
status="failed",
|
|
meta={"name": target},
|
|
)
|
|
await event.reply(f"Update failed: {exc}")
|
|
return
|
|
self.app.update_service.record_event(
|
|
action="plugin_update",
|
|
status="success",
|
|
meta={"name": target},
|
|
)
|
|
await event.reply(f"Updated {target}")
|
|
return
|
|
if action == "rollback":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
ref = args[2] if len(args) > 2 else "HEAD~1"
|
|
try:
|
|
await self.app.plugins.rollback(name, ref)
|
|
except Exception as exc:
|
|
self.app.update_service.record_event(
|
|
action="plugin_rollback",
|
|
status="failed",
|
|
meta={"name": name, "ref": ref},
|
|
)
|
|
await event.reply(f"Rollback failed: {exc}")
|
|
return
|
|
self.app.update_service.record_event(
|
|
action="plugin_rollback",
|
|
status="success",
|
|
meta={"name": name, "ref": ref},
|
|
)
|
|
await event.reply(f"Rolled back {name}")
|
|
return
|
|
if action == "info":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
info = await self.app.plugins.info(name)
|
|
await event.reply(str(info))
|
|
return
|
|
if action == "config":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
if len(args) == 2:
|
|
from core.config_ui import render_plugin_config
|
|
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
await event.reply(render_plugin_config(name, cfg))
|
|
return
|
|
if len(args) >= 4:
|
|
key = args[2]
|
|
value = " ".join(args[3:])
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
cfg.setdefault("settings", {})[key] = value
|
|
self.app.config.set_plugin_config(name, cfg)
|
|
self.app.config.save()
|
|
await event.reply("Config updated")
|
|
return
|
|
await event.reply("Usage: .plugin config <name> [key value]")
|
|
return
|
|
if action == "configui":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
try:
|
|
from telethon import Button
|
|
except Exception:
|
|
await event.reply("Buttons unavailable")
|
|
return
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
status = "enabled" if cfg.get("enabled", True) else "disabled"
|
|
auto = "auto" if cfg.get("auto_update", True) else "manual"
|
|
buttons = [
|
|
[Button.inline(f"Toggle ({status})", data=f"overub_cfg:toggle:{name}")],
|
|
[Button.inline(f"Auto-update ({auto})", data=f"overub_cfg:auto:{name}")],
|
|
]
|
|
await event.reply(f"Config for {name}", buttons=buttons)
|
|
return
|
|
if action == "remote":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
try:
|
|
remote = await self.app.plugins.remote(name)
|
|
except Exception as exc:
|
|
await event.reply(f"Remote failed: {exc}")
|
|
return
|
|
await event.reply(remote)
|
|
return
|
|
if action == "fetch":
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
try:
|
|
output = await self.app.plugins.fetch(name)
|
|
except Exception as exc:
|
|
await event.reply(f"Fetch failed: {exc}")
|
|
return
|
|
await event.reply(output or "Fetched")
|
|
return
|
|
if not name:
|
|
await event.reply("Plugin name required")
|
|
return
|
|
if action == "reload":
|
|
await self.app.plugins.reload(name)
|
|
await event.reply(f"Reloaded {name}")
|
|
return
|
|
if action == "enable":
|
|
await self.app.plugins.enable(name)
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
cfg["enabled"] = True
|
|
self.app.config.set_plugin_config(name, cfg)
|
|
self.app.config.save()
|
|
await event.reply(f"Enabled {name}")
|
|
return
|
|
if action == "disable":
|
|
await self.app.plugins.disable(name)
|
|
cfg = self.app.config.get_plugin_config(name)
|
|
cfg["enabled"] = False
|
|
self.app.config.set_plugin_config(name, cfg)
|
|
self.app.config.save()
|
|
await event.reply(f"Disabled {name}")
|
|
return
|
|
await event.reply("Unknown action")
|