313 lines
10 KiB
Python
313 lines
10 KiB
Python
import shlex
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple
|
|
|
|
from core.error_handler import capture_errors
|
|
|
|
|
|
CommandHandler = Callable[..., Awaitable[None]]
|
|
|
|
|
|
@dataclass
|
|
class Command:
|
|
name: str
|
|
handler: CommandHandler
|
|
description: str = ""
|
|
aliases: List[str] = field(default_factory=list)
|
|
category: str = ""
|
|
usage: str = ""
|
|
example: str = ""
|
|
cooldown: int = 0
|
|
permission: str = "user"
|
|
chat_types: List[str] = field(default_factory=list)
|
|
owner: str = ""
|
|
owner_type: str = ""
|
|
prefix: Optional[str] = None
|
|
arguments: List["ArgumentSpec"] = field(default_factory=list)
|
|
flags: List["FlagSpec"] = field(default_factory=list)
|
|
subcommands: Dict[str, "Command"] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class ArgumentSpec:
|
|
name: str
|
|
arg_type: Callable[[str], Any] = str
|
|
required: bool = False
|
|
default: Any = None
|
|
variadic: bool = False
|
|
|
|
|
|
@dataclass
|
|
class FlagSpec:
|
|
name: str
|
|
flag_type: Callable[[str], Any] = str
|
|
default: Any = None
|
|
aliases: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ParsedCommand:
|
|
name: str
|
|
args: List[str]
|
|
flags: Dict[str, Any]
|
|
raw: str
|
|
prefix: str
|
|
|
|
|
|
class CommandRegistry:
|
|
def __init__(self, prefix: str = ".") -> None:
|
|
self.prefix = prefix
|
|
self._commands: Dict[str, Command] = {}
|
|
self._cooldowns: Dict[Tuple[str, int], float] = {}
|
|
self._prefixes: List[str] = [prefix]
|
|
|
|
def register(self, command: Command) -> None:
|
|
self._commands[command.name] = command
|
|
for alias in command.aliases:
|
|
self._commands[alias] = command
|
|
if command.prefix and command.prefix not in self._prefixes:
|
|
self._prefixes.append(command.prefix)
|
|
|
|
def get(self, name: str) -> Optional[Command]:
|
|
return self._commands.get(name)
|
|
|
|
def resolve(self, name: str, args: List[str]) -> Tuple[Optional[Command], List[str]]:
|
|
command = self._commands.get(name)
|
|
if command and args and command.subcommands:
|
|
sub = command.subcommands.get(args[0])
|
|
if sub:
|
|
return sub, args[1:]
|
|
return command, args
|
|
|
|
def register_subcommand(self, parent: str, command: Command) -> None:
|
|
base = self._commands.get(parent)
|
|
if base:
|
|
base.subcommands[command.name] = command
|
|
|
|
def list(self) -> List[Command]:
|
|
seen = set()
|
|
result = []
|
|
for cmd in self._commands.values():
|
|
if cmd.name in seen:
|
|
continue
|
|
seen.add(cmd.name)
|
|
result.append(cmd)
|
|
return result
|
|
|
|
def parse(self, text: str) -> Optional[ParsedCommand]:
|
|
prefix = self._match_prefix(text)
|
|
if not prefix:
|
|
return None
|
|
raw = text[len(prefix):].strip()
|
|
if not raw:
|
|
return None
|
|
parts = shlex.split(raw)
|
|
if not parts:
|
|
return None
|
|
name = parts[0]
|
|
args, flags = self._parse_flags(parts[1:])
|
|
return ParsedCommand(name=name, args=args, flags=flags, raw=raw, prefix=prefix)
|
|
|
|
def cooldown_remaining(self, command: Command, user_id: int) -> int:
|
|
if command.cooldown <= 0:
|
|
return 0
|
|
key = (command.name, user_id)
|
|
last = self._cooldowns.get(key, 0.0)
|
|
now = time.time()
|
|
remaining = command.cooldown - (now - last)
|
|
if remaining > 0:
|
|
return int(remaining) + 1
|
|
self._cooldowns[key] = now
|
|
return 0
|
|
|
|
def help_text(self, name: Optional[str] = None) -> str:
|
|
if name:
|
|
command = self._commands.get(name)
|
|
if not command:
|
|
return "Command not found"
|
|
return self._format_command(command)
|
|
lines = ["Available commands:"]
|
|
for command in sorted(self.list(), key=lambda cmd: cmd.name):
|
|
lines.append(f"{self.prefix}{command.name} - {command.description}")
|
|
return "\n".join(lines)
|
|
|
|
def _format_command(self, command: Command) -> str:
|
|
parts = [
|
|
f"Name: {command.name}",
|
|
f"Description: {command.description}",
|
|
f"Usage: {command.usage or self.prefix + command.name}",
|
|
]
|
|
if command.aliases:
|
|
parts.append(f"Aliases: {', '.join(command.aliases)}")
|
|
if command.example:
|
|
parts.append(f"Example: {command.example}")
|
|
return "\n".join(parts)
|
|
|
|
def _match_prefix(self, text: str) -> Optional[str]:
|
|
for prefix in sorted(self._prefixes, key=len, reverse=True):
|
|
if text.startswith(prefix):
|
|
return prefix
|
|
return None
|
|
|
|
def _parse_flags(self, tokens: List[str]) -> Tuple[List[str], Dict[str, Any]]:
|
|
args: List[str] = []
|
|
flags: Dict[str, Any] = {}
|
|
idx = 0
|
|
while idx < len(tokens):
|
|
token = tokens[idx]
|
|
if token.startswith("--"):
|
|
if "=" in token:
|
|
key, value = token[2:].split("=", 1)
|
|
flags[key] = value
|
|
idx += 1
|
|
continue
|
|
key = token[2:]
|
|
if idx + 1 < len(tokens) and not tokens[idx + 1].startswith("-"):
|
|
flags[key] = tokens[idx + 1]
|
|
idx += 2
|
|
continue
|
|
flags[key] = True
|
|
idx += 1
|
|
continue
|
|
if token.startswith("-") and len(token) > 1:
|
|
for flag in token[1:]:
|
|
flags[flag] = True
|
|
idx += 1
|
|
continue
|
|
args.append(token)
|
|
idx += 1
|
|
return args, flags
|
|
|
|
|
|
class CommandBuilder:
|
|
def __init__(self, registry: CommandRegistry, owner: str = "", owner_type: str = "", prefix: Optional[str] = None) -> None:
|
|
self.registry = registry
|
|
self.owner = owner
|
|
self.owner_type = owner_type
|
|
self.prefix = prefix
|
|
|
|
def with_owner(self, owner: str, owner_type: str = "", prefix: Optional[str] = None) -> "CommandBuilder":
|
|
return CommandBuilder(self.registry, owner=owner, owner_type=owner_type, prefix=prefix)
|
|
|
|
def command(
|
|
self,
|
|
name: str,
|
|
description: str = "",
|
|
aliases: Optional[List[str]] = None,
|
|
category: str = "",
|
|
usage: str = "",
|
|
example: str = "",
|
|
cooldown: int = 0,
|
|
permission: str = "user",
|
|
chat_types: Optional[List[str]] = None,
|
|
owner: Optional[str] = None,
|
|
owner_type: Optional[str] = None,
|
|
prefix: Optional[str] = None,
|
|
arguments: Optional[List[ArgumentSpec]] = None,
|
|
flags: Optional[List[FlagSpec]] = None,
|
|
) -> Callable[[CommandHandler], CommandHandler]:
|
|
aliases = aliases or []
|
|
chat_types = chat_types or []
|
|
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
command = Command(
|
|
name=name,
|
|
handler=capture_errors(handler),
|
|
description=description,
|
|
aliases=aliases,
|
|
category=category,
|
|
usage=usage,
|
|
example=example,
|
|
cooldown=cooldown,
|
|
permission=permission,
|
|
chat_types=chat_types,
|
|
owner=owner or self.owner,
|
|
owner_type=owner_type or self.owner_type,
|
|
prefix=prefix or self.prefix,
|
|
arguments=arguments or [],
|
|
flags=flags or [],
|
|
)
|
|
self.registry.register(command)
|
|
return handler
|
|
|
|
return decorator
|
|
|
|
|
|
def command(name: str, **meta: Any) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
setattr(handler, "_command_meta", {"name": name, **meta})
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def alias(*names: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta.setdefault("aliases", [])
|
|
meta["aliases"].extend(names)
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def category(name: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["category"] = name
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def usage(text: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["usage"] = text
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def example(text: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["example"] = text
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def cooldown(seconds: int) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["cooldown"] = seconds
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def permission(level: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["permission"] = level
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def chat_type(*types: str) -> Callable[[CommandHandler], CommandHandler]:
|
|
def decorator(handler: CommandHandler) -> CommandHandler:
|
|
meta = getattr(handler, "_command_meta", {})
|
|
meta["chat_types"] = list(types)
|
|
setattr(handler, "_command_meta", meta)
|
|
return handler
|
|
return decorator
|
|
|
|
|
|
def register_decorated(builder: CommandBuilder, handler: CommandHandler) -> None:
|
|
meta = getattr(handler, "_command_meta", None)
|
|
if not meta:
|
|
return
|
|
builder.command(**meta)(handler)
|