Files
plain-ub-overfork/app/plugins/dev_tools.py
thedragonsinn 32db859d7e Initial Commit.
2023-09-25 18:28:01 +05:30

143 lines
4.9 KiB
Python

import asyncio
import importlib
import sys
import traceback
from io import StringIO
from pyrogram.enums import ParseMode
from app.core import Message
from app import Config, bot, DB # isort:skip
from app.utils import shell, aiohttp_tools as aio # isort:skip
# Run shell commands
async def run_cmd(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("executing...")
try:
proc_stdout: str = await asyncio.Task(
shell.run_shell_cmd(cmd), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled...`")
output: str = f"~$`{cmd}`\n\n`{proc_stdout}`"
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell with Live Output
async def live_shell(bot: bot, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("`getting live output....`")
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
sleep_for: int = 1
output: str = ""
try:
async for stdout in sub_process.get_output():
if output != stdout:
if len(stdout) <= 4096:
await reply.edit(
f"`{stdout}`",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
output = stdout
if sleep_for >= 6:
sleep_for = 1
await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id)
sleep_for += 1
return await reply.edit(
f"~$`{cmd}\n\n``{sub_process.full_std}`",
name="shell.txt",
disable_web_page_preview=True,
)
except asyncio.exceptions.CancelledError:
sub_process.cancel()
return await reply.edit(f"`Cancelled....`")
# Run Python code
async def executor(bot: bot, message: Message) -> Message | None:
code: str = message.flt_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply: Message = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "\n ".join(code.splitlines())
try:
# Create and initialise the function
exec(f"async def _exec(bot, message):\n {formatted_code}")
func_out = await asyncio.Task(
locals()["_exec"](bot, message), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled....`")
except BaseException:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
if func_out is not None:
output = "\n\n".join([output, str(func_out)]).strip()
if "-s" not in message.flags:
output = f"> `{code}`\n\n>> `{output}`"
else:
return await reply.delete()
await reply.edit(
output,
name="exec.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
async def loader(bot: bot, message: Message) -> Message | None:
if (
not message.replied
or not message.replied.document
or not message.replied.document.file_name.endswith(".py")
):
return await message.reply("reply to a plugin.")
reply: Message = await message.reply("Loading....")
file_name: str = message.replied.document.file_name.rstrip(".py")
reload = sys.modules.pop(f"app.temp.{file_name}", None)
status: str = "Reloaded" if reload else "Loaded"
await message.replied.download("app/temp/")
try:
importlib.import_module(f"app.temp.{file_name}")
except BaseException:
return await reply.edit(str(traceback.format_exc()))
await reply.edit(f"{status} {file_name}.py.")
@bot.add_cmd(cmd="c")
async def cancel_task(bot: bot, message: Message) -> Message | None:
task_id: str | None = message.replied_task_id
if not task_id:
return await message.reply(
text="Reply To a Command or Bot's Response Message.", del_in=8
)
all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks()
tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id]
if not tasks:
return await message.reply(
text="Task not in Currently Running Tasks.", del_in=8
)
response: str = ""
for task in tasks:
status: bool = task.cancel()
response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n"
await message.reply(response, del_in=5)
if Config.DEV_MODE:
Config.CMD_DICT["sh"] = run_cmd
Config.CMD_DICT["shell"] = live_shell
Config.CMD_DICT["exec"] = executor
Config.CMD_DICT["load"] = loader