Add cmd info in plugins.

This commit is contained in:
thedragonsinn
2023-12-15 15:03:42 +05:30
parent 9f2670a8dc
commit a3d3d51850
25 changed files with 239 additions and 66 deletions

View File

@@ -1,12 +1,18 @@
import json
import os
from typing import Callable
from git import Repo
class _Config:
class CMD:
def __init__(self, func, path, doc):
self.func = func
self.path = path
self.doc = doc or "Not Documented."
def __init__(self):
self.CMD_DICT: dict[str, dict[str, Callable, str]] = {}
self.CMD_DICT: dict[str, _Config.CMD] = {}
self.CMD_TRIGGER: str = os.environ.get("CMD_TRIGGER", ".")

View File

@@ -53,9 +53,13 @@ class BOT(Client):
def wrapper():
if isinstance(cmd, list):
for _cmd in cmd:
Config.CMD_DICT[_cmd] = {"func": func, "path": path}
Config.CMD_DICT[_cmd] = Config.CMD(
func=func, path=path, doc=func.__doc__
)
else:
Config.CMD_DICT[cmd] = {"func": func, "path": path}
Config.CMD_DICT[cmd] = Config.CMD(
func=func, path=path, doc=func.__doc__
)
wrapper()
return func
@@ -90,7 +94,7 @@ class BOT(Client):
await idle()
await aiohttp_tools.init_task()
LOGGER.info("DB Closed.")
DB._client.close()
DB.close()
async def edit_restart_msg(self) -> None:
restart_msg = int(os.environ.get("RESTART_MSG", 0))
@@ -136,7 +140,7 @@ class BOT(Client):
await aiohttp_tools.init_task()
await super().stop(block=False)
LOGGER.info("Closing DB....")
DB._client.close()
DB.close()
if hard:
os.remove("logs/app_logs.txt")
os.execl("/bin/bash", "/bin/bash", "run")

View File

@@ -11,7 +11,7 @@ from app.core import Message, filters
@bot.on_edited_message(filters.owner_filter | filters.sudo_filter, group=1)
async def cmd_dispatcher(bot: BOT, message: Message) -> None:
message = Message.parse_message(message)
func = Config.CMD_DICT[message.cmd]["func"]
func = Config.CMD_DICT[message.cmd].func
coro = func(bot, message)
x = await run_coro(coro, message)
if not x and message.is_from_owner:

View File

@@ -19,10 +19,32 @@ class DataBase:
def __getattr__(self, attr) -> AgnosticCollection:
try:
return self.__dict__[attr]
collection: AgnosticCollection = self.__dict__[attr]
return collection
except KeyError:
self.__dict__[attr] = self.db[attr]
return self.__dict__[attr]
collection: AgnosticCollection = self.__dict__[attr]
return collection
@staticmethod
async def add_data(
collection: AgnosticCollection, id: int | str, data: dict
) -> None:
found = await collection.find_one({"_id": id})
if not found:
await collection.insert_one({"_id": id, **data})
else:
await collection.update_one({"_id": id}, {"$set": data})
@staticmethod
async def delete_data(collection: AgnosticCollection, id: int | str) -> bool | None:
found = await collection.find_one({"_id": id})
if found:
await collection.delete_one({"_id": id})
return True
def close(self):
self._client.close()
DB: DataBase = DataBase()

View File

@@ -6,7 +6,6 @@ from pyrogram import filters
from pyrogram.types import Chat, User
from app import BOT, DB, Config, Message, bot
from app.utils.db_utils import add_data, delete_data
from app.utils.helpers import get_name
FED_LIST: AgnosticCollection = DB.FED_LIST
@@ -35,8 +34,14 @@ class _User(User):
@bot.add_cmd(cmd="addf")
async def add_fed(bot: BOT, message: Message):
"""
CMD: ADDF
INFO: Add a Fed Chat to DB.
USAGE:
.addf | .addf NAME
"""
data = dict(name=message.input or message.chat.title, type=str(message.chat.type))
await add_data(collection=FED_LIST, id=message.chat.id, data=data)
await DB.add_data(collection=FED_LIST, id=message.chat.id, data=data)
await message.reply(
f"<b>{data['name']}</b> added to FED LIST.", del_in=5, block=False
)
@@ -47,6 +52,13 @@ async def add_fed(bot: BOT, message: Message):
@bot.add_cmd(cmd="delf")
async def remove_fed(bot: BOT, message: Message):
"""
CMD: DELF
INFO: Delete a Fed from DB.
FLAGS: -all to delete all feds.
USAGE:
.delf | .delf id | .delf -all
"""
if "-all" in message.flags:
await FED_LIST.drop()
await message.reply("FED LIST cleared.")
@@ -58,7 +70,7 @@ async def remove_fed(bot: BOT, message: Message):
chat = chat.id
elif chat.lstrip("-").isdigit():
chat = int(chat)
deleted: bool | None = await delete_data(collection=FED_LIST, id=chat)
deleted: bool | None = await DB.delete_data(collection=FED_LIST, id=chat)
if deleted:
await message.reply(
f"<b>{name}</b><code>{chat}</code> removed from FED LIST.",
@@ -108,7 +120,7 @@ async def fed_ban(bot: BOT, message: Message):
failed.append(fed["name"])
elif "Would you like to update this reason" in response.text:
await response.click("Update reason")
await asyncio.sleep(0.8)
await asyncio.sleep(1)
if not total:
await progress.edit("You Don't have any feds connected!")
return
@@ -171,6 +183,12 @@ async def un_fban(bot: BOT, message: Message):
@bot.add_cmd(cmd="listf")
async def fed_list(bot: BOT, message: Message):
"""
CMD: LISTF
INFO: View Connected Feds.
FLAGS: -id to list Fed Chat IDs.
USAGE: .listf | .listf -id
"""
output: str = ""
total = 0
async for fed in FED_LIST.find():
@@ -182,4 +200,4 @@ async def fed_list(bot: BOT, message: Message):
await message.reply("You don't have any Feds Connected.")
return
output: str = f"List of <b>{total}</b> Connected Feds:\n\n{output}"
await message.reply(output, del_in=30, block=False)
await message.reply(output, del_in=30, block=True)

View File

@@ -35,6 +35,15 @@ def get_privileges(
@bot.add_cmd(cmd=["promote", "demote"])
async def promote_or_demote(bot: BOT, message: Message) -> None:
"""
CMD: PROMOTE | DEMOTE
INFO: Add/Remove an Admin.
FLAGS:
PROMOTE: -full for full rights, -anon for anon admin
USAGE:
PROMOTE: .promote [ -anon | -full ] [ UID | REPLY | @ ] Title[Optional]
DEMOTE: .demote [ UID | REPLY | @ ]
"""
response: Message = await message.reply(
f"Trying to {message.cmd.capitalize()}....."
)

View File

@@ -11,6 +11,13 @@ from app.utils import shell, aiohttp_tools as aio # isort:skip
async def executor(bot: BOT, message: Message) -> Message | None:
"""
CMD: EXEC
INFO: Run Python Code.
FLAGS: -s to only show output.
USAGE:
.exec [-s] return 1
"""
code: str = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
@@ -50,4 +57,6 @@ async def executor(bot: BOT, message: Message) -> Message | None:
if Config.DEV_MODE:
Config.CMD_DICT["exec"] = {"func": executor, "path": inspect.stack()[0][1]}
Config.CMD_DICT["exec"] = Config.CMD(
func=executor, path=inspect.stack()[0][1], doc=executor.__doc__
)

View File

@@ -26,4 +26,6 @@ async def loader(bot: BOT, message: Message) -> Message | None:
if Config.DEV_MODE:
Config.CMD_DICT["load"] = {"func": loader, "path": inspect.stack()[0][1]}
Config.CMD_DICT["load"] = Config.CMD(
func=loader, path=inspect.stack()[0][1], doc=loader.__doc__
)

View File

@@ -52,5 +52,9 @@ async def live_shell(bot: BOT, message: Message) -> Message | None:
if Config.DEV_MODE:
Config.CMD_DICT["shell"] = {"func": live_shell, "path": inspect.stack()[0][1]}
Config.CMD_DICT["sh"] = {"func": run_cmd, "path": inspect.stack()[0][1]}
Config.CMD_DICT["shell"] = Config.CMD(
func=live_shell, path=inspect.stack()[0][1], doc=live_shell.__doc__
)
Config.CMD_DICT["sh"] = Config.CMD(
func=run_cmd, path=inspect.stack()[0][1], doc=run_cmd.__doc__
)

View File

@@ -6,24 +6,13 @@ from app import BOT, Message, bot
from app.utils import aiohttp_tools
from app.utils.helpers import post_to_telegraph as post_tgh
# Your Alldbrid App token
KEY = os.environ.get("DEBRID_TOKEN")
TEMPLATE = """
<b>Name</b>: <i>{name}</i>
Status: <i>{status}</i>
ID: {id}
Size: {size}
{uptobox}"""
# Get response from api and return json or the error
async def get_json(endpoint: str, query: dict):
if not KEY:
async def get_json(endpoint: str, query: dict, key=os.environ.get("DEBRID_TOKEN")):
if not key:
return "API key not found."
api = "https://api.alldebrid.com/v4" + endpoint
params = {"agent": "bot", "apikey": KEY, **query}
params = {"agent": "bot", "apikey": key, **query}
async with aiohttp_tools.SESSION.get(url=api, params=params) as ses:
try:
json = await ses.json()
@@ -120,9 +109,11 @@ async def torrents(bot: BOT, message: Message):
+ " ]"
)
ret_str_list.append(
ret_val := TEMPLATE.format(
name=name, status=status, id=id, size=size, uptobox=uptobox
)
f"\n<b>Name</b>: <i>{name}</i>"
f"\nStatus: <i>{status}</i>"
f"\nID: {id}"
f"\nSize: {size}"
f"\n{uptobox}"
)
ret_str = "<br>".join(ret_str_list)

View File

@@ -11,6 +11,12 @@ from app.utils.media_helper import get_tg_media_details
@bot.add_cmd(cmd="download")
async def down_load(bot: BOT, message: Message):
"""
CMD: DOWNLOAD
INFO: Download Files/TG Media to Bot server.
USAGE:
.download URL | Reply to Media
"""
response = await message.reply("Checking Input...")
if (not message.replied or not message.replied.media) and not message.input:
await response.edit(
@@ -63,6 +69,8 @@ async def telegram_download(
media_obj.name,
media_obj.full_path)
await message.download(
file_name=media_obj.full_path, progress=progress, progress_args=progress_args,
file_name=media_obj.full_path,
progress=progress,
progress_args=progress_args,
)
return media_obj

View File

@@ -80,13 +80,20 @@ def file_check(file: str):
@bot.add_cmd(cmd="upload")
async def upload(bot: BOT, message: Message):
"""
CMD: UPLOAD
INFO: Upload Media/Local Files/Plugins to TG.
FLAGS: -d to upload as doc.
USAGE:
.upload [-d] URL | Path to File | CMD
"""
input = message.flt_input
if not input:
await message.reply("give a file url | path to upload.")
return
response = await message.reply("checking input...")
if input in Config.CMD_DICT:
await message.reply_document(document=Config.CMD_DICT[input]["path"])
await message.reply_document(document=Config.CMD_DICT[input].path)
await response.delete()
return
elif input.startswith("http") and not file_check(input):

View File

@@ -9,6 +9,13 @@ async def init_task():
@bot.add_cmd(cmd="addscmd")
async def add_scmd(bot: BOT, message: Message):
"""
CMD: ADDSCMD
INFO: Add Sudo Commands.
FLAGS: -all to instantly add all Commands.
USAGE:
.addscmd ping | .addscmd -all
"""
if "-all" in message.flags:
cmds = [{"_id": cmd} for cmd in Config.CMD_DICT.keys()]
Config.SUDO_CMD_LIST = list(Config.CMD_DICT.keys())
@@ -30,6 +37,13 @@ async def add_scmd(bot: BOT, message: Message):
@bot.add_cmd(cmd="delscmd")
async def del_scmd(bot: BOT, message: Message):
"""
CMD: DELSCMD
INFO: Remove Sudo Commands.
FLAGS: -all to instantly remove all Commands.
USAGE:
.delscmd ping | .delscmd -all
"""
if "-all" in message.flags:
Config.SUDO_CMD_LIST = []
await DB.SUDO_CMD_LIST.drop()

View File

@@ -2,7 +2,6 @@ from pyrogram.types import User
from app import BOT, DB, Config, Message, bot
from app.plugins.admin.fbans import _User
from app.utils.db_utils import add_data, delete_data
from app.utils.helpers import extract_user_data, get_name
@@ -15,17 +14,31 @@ async def init_task():
@bot.add_cmd(cmd="sudo")
async def sudo(bot: BOT, message: Message):
"""
CMD: SUDO
INFO: Enable/Disable sudo..
FLAGS: -c to check sudo status.
USAGE:
.sudo | .sudo -c
"""
if "-c" in message.flags:
await message.reply(text=f"Sudo is enabled: <b>{Config.SUDO}</b> .", del_in=8)
return
value = not Config.SUDO
Config.SUDO = value
await add_data(collection=DB.SUDO, id="sudo_switch", data={"value": value})
await DB.add_data(collection=DB.SUDO, id="sudo_switch", data={"value": value})
await message.reply(text=f"Sudo is enabled: <b>{value}</b>!", del_in=8)
@bot.add_cmd(cmd="addsudo")
async def add_sudo(bot: BOT, message: Message) -> Message | None:
"""
CMD: ADDSUDO
INFO: Add Sudo User.
FLAGS: -temp to temporarily add until bot restarts.
USAGE:
.addsudo [-temp] [ UID | @ | Reply to Message ]
"""
response = await message.reply("Extracting User info...")
user, _ = await message.extract_user_n_reason()
if isinstance(user, str):
@@ -39,7 +52,7 @@ async def add_sudo(bot: BOT, message: Message) -> Message | None:
response_str = f"{user.mention} added to Sudo List."
Config.SUDO_USERS.append(user.id)
if "-temp" not in message.flags:
await add_data(
await DB.add_data(
collection=DB.SUDO_USERS, id=user.id, data=extract_user_data(user)
)
else:
@@ -50,6 +63,13 @@ async def add_sudo(bot: BOT, message: Message) -> Message | None:
@bot.add_cmd(cmd="delsudo")
async def remove_sudo(bot: BOT, message: Message) -> Message | None:
"""
CMD: DELSUDO
INFO: Add Remove User.
FLAGS: -temp to temporarily remove until bot restarts.
USAGE:
.delsudo [-temp] [ UID | @ | Reply to Message ]
"""
response = await message.reply("Extracting User info...")
user, _ = await message.extract_user_n_reason()
if isinstance(user, str):
@@ -64,7 +84,7 @@ async def remove_sudo(bot: BOT, message: Message) -> Message | None:
Config.SUDO_USERS.remove(user.id)
response_str = f"{user.mention} removed from Sudo List."
if "-temp" not in message.flags:
await delete_data(collection=DB.SUDO_USERS, id=user.id)
await DB.delete_data(collection=DB.SUDO_USERS, id=user.id)
else:
response_str += "\n<b>Temporary</b>: True"
await response.edit(text=response_str, del_in=5)
@@ -73,6 +93,13 @@ async def remove_sudo(bot: BOT, message: Message) -> Message | None:
@bot.add_cmd(cmd="vsudo")
async def sudo_list(bot: BOT, message: Message):
"""
CMD: VSUDO
INFO: View Sudo Users.
FLAGS: -id to get UIDs
USAGE:
.vsudo | .vsudo -id
"""
output: str = ""
total = 0
async for user in DB.SUDO_USERS.find():

View File

@@ -5,6 +5,11 @@ from app import BOT, Message, bot
@bot.add_cmd(cmd="c")
async def cancel_task(bot: BOT, message: Message) -> Message | None:
"""
CMD: CANCEL
INFO: Cancel a running command by replying to a message.
USAGE: .c
"""
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(

View File

@@ -5,6 +5,13 @@ from app.plugins.tools.get_message import parse_link
@bot.add_cmd(cmd="del")
async def delete_message(bot: BOT, message: Message) -> None:
"""
CMD: DEL
INFO: Delete the replied message.
FLAGS: -r to remotely delete a text using its link.
USAGE:
.del | .del -r t.me/......
"""
if "-r" in message.flags:
chat_id, message_id = parse_link(message.flt_input)
await bot.delete_messages(chat_id=chat_id, message_ids=message_id, revoke=True)

View File

@@ -13,6 +13,12 @@ def parse_link(link: str) -> tuple[int | str, int]:
@bot.add_cmd(cmd="gm")
async def get_message(bot: BOT, message: Message):
"""
CMD: Get Message
INFO: Get a Message Json/Attr by providing link.
USAGE:
.gm t.me/.... | .gm t.me/... text [Returns message text]
"""
if not message.input:
await message.reply("Give a Message link.")
attr = None

View File

@@ -21,6 +21,13 @@ EMOJIS = ("☕", "🤡", "🙂", "🤔", "🔪", "😂", "💀")
@bot.add_cmd(cmd="kang")
async def kang_sticker(bot: BOT, message: Message):
"""
CMD: KANG
INFO: Save a sticker/image/gif/video to your sticker pack.
FLAGS: -f to fastforward video tp fit 3 sec duration.
USAGE: .kang | .kang -f
"""
response = await message.reply("Checking input")
media_coro = get_sticker_media_coro(message)
kwargs: dict = await media_coro

View File

@@ -4,6 +4,13 @@ from app.plugins.tools.get_message import parse_link
@bot.add_cmd(cmd="reply")
async def reply(bot: BOT, message: Message) -> None:
"""
CMD: REPLY
INFO: Reply to a Message.
FLAGS: -r to reply remotely using message link.
USAGE:
.reply HI | .reply -r t.me/... HI
"""
if "-r" in message.flags:
input: list[str] = message.flt_input.split(" ", maxsplit=1)
if len(input) < 2:

View File

@@ -5,11 +5,16 @@ from app import BOT, Config, Message, bot
@bot.add_cmd(cmd="ci")
async def cmd_info(bot: BOT, message=Message):
"""
CMD: CI (CMD INFO)
INFO: Get Github File URL of a Command.
USAGE: .ci ci
"""
cmd = message.flt_input
if not cmd or cmd not in Config.CMD_DICT.keys():
await message.reply("Give a valid cmd.", del_in=5)
return
cmd_path = Config.CMD_DICT[cmd]["path"]
cmd_path = Config.CMD_DICT[cmd].path
plugin_path = os.path.relpath(cmd_path, os.curdir)
repo = Config.REPO.remotes.origin.url
branch = Config.REPO.active_branch

View File

@@ -3,9 +3,26 @@ from app import BOT, Config, Message, bot
@bot.add_cmd(cmd="help")
async def cmd_list(bot: BOT, message: Message) -> None:
commands: str = "\n".join(
[f"<code>{message.trigger}{cmd}</code>" for cmd in Config.CMD_DICT.keys()]
)
await message.reply(
f"<b>Available Commands:</b>\n\n{commands}", del_in=30, block=True
)
"""
CMD: HELP
INFO: Check info/about available cmds.
USAGE:
.help help | .help
"""
cmd = message.input.strip()
if not cmd:
commands: str = " ".join(
[
f"<code>{message.trigger}{cmd}</code>"
for cmd in sorted(Config.CMD_DICT.keys())
]
)
await message.reply(
text=f"<b>Available Commands:</b>\n\n{commands}", del_in=30, block=True
)
elif cmd not in Config.CMD_DICT.keys():
await message.reply(
f"Invalid <b>{cmd}</b>, check {message.trigger}help", del_in=5
)
else:
await message.reply(f"<pre language=js>Doc:{Config.CMD_DICT[cmd].doc}</pre>", del_in=30)

View File

@@ -7,6 +7,13 @@ from app import BOT, Message, bot
@bot.add_cmd(cmd="restart")
async def restart(bot: BOT, message: Message, u_resp: Message | None = None) -> None:
"""
CMD: RESTART
INFO: Restart the Bot.
FLAGS: -h for hard restart and clearing logs
Usage:
.restart | .restart -h
"""
reply: Message = u_resp or await message.reply("restarting....")
if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
os.environ["RESTART_MSG"] = str(reply.id)

View File

@@ -38,6 +38,13 @@ async def pull_commits(repo: Repo) -> None | bool:
@bot.add_cmd(cmd="update")
async def updater(bot: BOT, message: Message) -> None | Message:
"""
CMD: UPDATE
INFO: Pull / Check for updates.
FLAGS: -pull to pull updates
USAGE:
.update | .update -pull
"""
reply: Message = await message.reply("Checking for Updates....")
repo: Repo = Config.REPO
commits: str = await get_commits(repo)

View File

@@ -1,16 +0,0 @@
from motor.core import AgnosticCollection
async def add_data(collection: AgnosticCollection, id: int | str, data: dict) -> None:
found = await collection.find_one({"_id": id})
if not found:
await collection.insert_one({"_id": id, **data})
else:
await collection.update_one({"_id": id}, {"$set": data})
async def delete_data(collection: AgnosticCollection, id: int | str) -> bool | None:
found = await collection.find_one({"_id": id})
if found:
await collection.delete_one({"_id": id})
return True

View File

@@ -10,14 +10,14 @@ TELEGRAPH: None | Telegraph = None
PROGRESS_DICT = {}
"""
async def init_task():
global TELEGRAPH
TELEGRAPH = Telegraph()
await TELEGRAPH.create_account(
short_name="Plain-UB", author_name="Plain-UB", author_url=Config.UPSTREAM_REPO
)
"""
async def post_to_telegraph(title: str, text: str):
telegraph = await TELEGRAPH.create_page(