143 lines
4.9 KiB
Python
143 lines
4.9 KiB
Python
import asyncio
|
|
import importlib
|
|
import sys
|
|
import traceback
|
|
from io import StringIO
|
|
|
|
from pyrogram.enums import ParseMode
|
|
|
|
from app.core import Message
|
|
|
|
from app import Config, bot, DB # isort:skip
|
|
from app.utils import shell, aiohttp_tools as aio # isort:skip
|
|
|
|
|
|
# Run shell commands
|
|
async def run_cmd(bot: bot, message: Message) -> Message | None:
|
|
cmd: str = message.input.strip()
|
|
reply: Message = await message.reply("executing...")
|
|
try:
|
|
proc_stdout: str = await asyncio.Task(
|
|
shell.run_shell_cmd(cmd), name=reply.task_id
|
|
)
|
|
except asyncio.exceptions.CancelledError:
|
|
return await reply.edit("`Cancelled...`")
|
|
output: str = f"~$`{cmd}`\n\n`{proc_stdout}`"
|
|
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
|
|
|
|
|
|
# Shell with Live Output
|
|
async def live_shell(bot: bot, message: Message) -> Message | None:
|
|
cmd: str = message.input.strip()
|
|
reply: Message = await message.reply("`getting live output....`")
|
|
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
|
|
sleep_for: int = 1
|
|
output: str = ""
|
|
try:
|
|
async for stdout in sub_process.get_output():
|
|
if output != stdout:
|
|
if len(stdout) <= 4096:
|
|
await reply.edit(
|
|
f"`{stdout}`",
|
|
disable_web_page_preview=True,
|
|
parse_mode=ParseMode.MARKDOWN,
|
|
)
|
|
output = stdout
|
|
if sleep_for >= 6:
|
|
sleep_for = 1
|
|
await asyncio.Task(asyncio.sleep(sleep_for), name=reply.task_id)
|
|
sleep_for += 1
|
|
return await reply.edit(
|
|
f"~$`{cmd}\n\n``{sub_process.full_std}`",
|
|
name="shell.txt",
|
|
disable_web_page_preview=True,
|
|
)
|
|
except asyncio.exceptions.CancelledError:
|
|
sub_process.cancel()
|
|
return await reply.edit(f"`Cancelled....`")
|
|
|
|
|
|
# Run Python code
|
|
|
|
|
|
async def executor(bot: bot, message: Message) -> Message | None:
|
|
code: str = message.flt_input.strip()
|
|
if not code:
|
|
return await message.reply("exec Jo mama?")
|
|
reply: Message = await message.reply("executing")
|
|
sys.stdout = codeOut = StringIO()
|
|
sys.stderr = codeErr = StringIO()
|
|
# Indent code as per proper python syntax
|
|
formatted_code = "\n ".join(code.splitlines())
|
|
try:
|
|
# Create and initialise the function
|
|
exec(f"async def _exec(bot, message):\n {formatted_code}")
|
|
func_out = await asyncio.Task(
|
|
locals()["_exec"](bot, message), name=reply.task_id
|
|
)
|
|
except asyncio.exceptions.CancelledError:
|
|
return await reply.edit("`Cancelled....`")
|
|
except BaseException:
|
|
func_out = str(traceback.format_exc())
|
|
sys.stdout = sys.__stdout__
|
|
sys.stderr = sys.__stderr__
|
|
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
|
|
if func_out is not None:
|
|
output = "\n\n".join([output, str(func_out)]).strip()
|
|
if "-s" not in message.flags:
|
|
output = f"> `{code}`\n\n>> `{output}`"
|
|
else:
|
|
return await reply.delete()
|
|
await reply.edit(
|
|
output,
|
|
name="exec.txt",
|
|
disable_web_page_preview=True,
|
|
parse_mode=ParseMode.MARKDOWN,
|
|
)
|
|
|
|
|
|
async def loader(bot: bot, message: Message) -> Message | None:
|
|
if (
|
|
not message.replied
|
|
or not message.replied.document
|
|
or not message.replied.document.file_name.endswith(".py")
|
|
):
|
|
return await message.reply("reply to a plugin.")
|
|
reply: Message = await message.reply("Loading....")
|
|
file_name: str = message.replied.document.file_name.rstrip(".py")
|
|
reload = sys.modules.pop(f"app.temp.{file_name}", None)
|
|
status: str = "Reloaded" if reload else "Loaded"
|
|
await message.replied.download("app/temp/")
|
|
try:
|
|
importlib.import_module(f"app.temp.{file_name}")
|
|
except BaseException:
|
|
return await reply.edit(str(traceback.format_exc()))
|
|
await reply.edit(f"{status} {file_name}.py.")
|
|
|
|
|
|
@bot.add_cmd(cmd="c")
|
|
async def cancel_task(bot: bot, message: Message) -> Message | None:
|
|
task_id: str | None = message.replied_task_id
|
|
if not task_id:
|
|
return await message.reply(
|
|
text="Reply To a Command or Bot's Response Message.", del_in=8
|
|
)
|
|
all_tasks: set[asyncio.all_tasks] = asyncio.all_tasks()
|
|
tasks: list[asyncio.Task] | None = [x for x in all_tasks if x.get_name() == task_id]
|
|
if not tasks:
|
|
return await message.reply(
|
|
text="Task not in Currently Running Tasks.", del_in=8
|
|
)
|
|
response: str = ""
|
|
for task in tasks:
|
|
status: bool = task.cancel()
|
|
response += f"Task: __{task.get_name()}__\nCancelled: __{status}__\n"
|
|
await message.reply(response, del_in=5)
|
|
|
|
|
|
if Config.DEV_MODE:
|
|
Config.CMD_DICT["sh"] = run_cmd
|
|
Config.CMD_DICT["shell"] = live_shell
|
|
Config.CMD_DICT["exec"] = executor
|
|
Config.CMD_DICT["load"] = loader
|