diff --git a/app/config.py b/app/config.py
index c5e7cc7..2241e7c 100644
--- a/app/config.py
+++ b/app/config.py
@@ -1,5 +1,5 @@
import asyncio
-import os
+from os import environ, path
from typing import Callable, Coroutine
from git import Repo
@@ -8,12 +8,13 @@ from app.utils import Str
class Cmd(Str):
- def __init__(self, cmd: str, func: Callable, path: str, sudo: bool):
+ def __init__(self, cmd: str, func: Callable, cmd_path: str, sudo: bool):
self.cmd: str = cmd
- self.func: Callable = func
- self.path: str = path
- self.dirname: str = os.path.basename(os.path.dirname(path))
+ self.cmd_path: str = cmd_path
+ self.dirname: str = path.basename(path.dirname(cmd_path))
self.doc: str = func.__doc__ or "Not Documented."
+ self.func: Callable = func
+ self.loaded = False
self.sudo: bool = sudo
@@ -24,27 +25,31 @@ class Config:
CMD_DICT: dict[str, Cmd] = {}
- CMD_TRIGGER: str = os.environ.get("CMD_TRIGGER", ".")
+ CMD_TRIGGER: str = environ.get("CMD_TRIGGER", ".")
- DEV_MODE: int = int(os.environ.get("DEV_MODE", 0))
+ DEV_MODE: int = int(environ.get("DEV_MODE", 0))
DISABLED_SUPERUSERS: list[int] = []
FBAN_LOG_CHANNEL: int = int(
- os.environ.get("FBAN_LOG_CHANNEL", os.environ.get("LOG_CHAT"))
+ environ.get("FBAN_LOG_CHANNEL", environ.get("LOG_CHAT"))
)
- GEMINI_API_KEY: str = os.environ.get("GEMINI_API_KEY")
+ FBAN_SUDO_ID: int = int(environ.get("FBAN_SUDO_ID", 0))
+
+ FBAN_SUDO_TRIGGER: str = environ.get("FBAN_SUDO_TRIGGER")
+
+ GEMINI_API_KEY: str = environ.get("GEMINI_API_KEY")
INIT_TASKS: list[Coroutine] = []
- LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
+ LOG_CHAT: int = int(environ.get("LOG_CHAT"))
- MESSAGE_LOGGER_CHAT: int = int(os.environ.get("MESSAGE_LOGGER_CHAT", LOG_CHAT))
+ MESSAGE_LOGGER_CHAT: int = int(environ.get("MESSAGE_LOGGER_CHAT", LOG_CHAT))
MESSAGE_LOGGER_TASK: asyncio.Task | None = None
- OWNER_ID: int = int(os.environ.get("OWNER_ID"))
+ OWNER_ID: int = int(environ.get("OWNER_ID"))
PM_GUARD: bool = False
@@ -54,9 +59,7 @@ class Config:
SUDO: bool = False
- SUDO_TRIGGER: str = os.environ.get("SUDO_TRIGGER", "!")
-
- SUDO_CMD_LIST: list[str] = []
+ SUDO_TRIGGER: str = environ.get("SUDO_TRIGGER", "!")
SUDO_USERS: list[int] = []
@@ -64,6 +67,6 @@ class Config:
TAG_LOGGER: bool = False
- UPSTREAM_REPO: str = os.environ.get(
+ UPSTREAM_REPO: str = environ.get(
"UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub"
)
diff --git a/app/core/conversation.py b/app/core/conversation.py
index 88359cb..171030a 100644
--- a/app/core/conversation.py
+++ b/app/core/conversation.py
@@ -33,12 +33,6 @@ class Conversation(Str):
self.timeout: int = timeout
self.set_future()
- def _check_duplicates(self):
- if not self.check_for_duplicates:
- return
- if self.chat_id in Conversation.CONVO_DICT.keys():
- raise self.DuplicateConvo(self.chat_id)
-
async def __aenter__(self) -> Self:
"""
Convert Username to ID if chat_id is username.
@@ -47,7 +41,8 @@ class Conversation(Str):
"""
if isinstance(self.chat_id, str):
self.chat_id = (await self._client.get_chat(self.chat_id)).id
- self._check_duplicates()
+ if self.check_for_duplicates and self.chat_id in Conversation.CONVO_DICT.keys():
+ raise self.DuplicateConvo(self.chat_id)
Conversation.CONVO_DICT[self.chat_id].append(self)
return self
@@ -62,7 +57,7 @@ class Conversation(Str):
@classmethod
async def get_resp(cls, client, *args, **kwargs) -> Message | None:
"""
- Bound Method to Gracefully handle TimeOut.
+ Bound Method to Gracefully handle Timeout.
but only returns first Message.
"""
try:
@@ -72,13 +67,13 @@ class Conversation(Str):
except TimeoutError:
return
- """Methods"""
-
def set_future(self, *args, **kwargs):
future = asyncio.Future()
future.add_done_callback(self.set_future)
self.response_future = future
+ """Methods"""
+
async def get_response(self, timeout: int = 0) -> Message | None:
"""Returns Latest Message for Specified Filters."""
try:
diff --git a/app/core/decorators/add_cmd.py b/app/core/decorators/add_cmd.py
index 43e9c18..07960c6 100644
--- a/app/core/decorators/add_cmd.py
+++ b/app/core/decorators/add_cmd.py
@@ -16,11 +16,11 @@ class AddCmd:
if isinstance(cmd, list):
for _cmd in cmd:
Config.CMD_DICT[_cmd] = Config.CMD(
- cmd=_cmd, func=func, path=path, sudo=allow_sudo
+ cmd=_cmd, func=func, cmd_path=path, sudo=allow_sudo
)
else:
Config.CMD_DICT[cmd] = Config.CMD(
- cmd=cmd, func=func, path=path, sudo=allow_sudo
+ cmd=cmd, func=func, cmd_path=path, sudo=allow_sudo
)
wrapper()
diff --git a/app/core/handlers/filters.py b/app/core/handlers/filters.py
index 953a3a1..d3b1b56 100644
--- a/app/core/handlers/filters.py
+++ b/app/core/handlers/filters.py
@@ -17,9 +17,9 @@ def cmd_check(message: Message, trigger: str, sudo: bool = False) -> bool:
if not cmd_obj:
return False
if sudo:
- in_sudo = cmd in Config.SUDO_CMD_LIST
- has_access = Config.CMD_DICT[cmd].sudo
- return in_sudo and has_access
+ in_loaded = cmd_obj.loaded
+ has_access = cmd_obj.sudo
+ return in_loaded and has_access
return True
diff --git a/app/core/types/message.py b/app/core/types/message.py
index 97c0576..6e2084a 100644
--- a/app/core/types/message.py
+++ b/app/core/types/message.py
@@ -30,7 +30,7 @@ class Message(Msg):
return [i for i in self.text_list if i.startswith("-")]
@cached_property
- def flt_input(self) -> str:
+ def filtered_input(self) -> str:
split_lines = self.input.split(sep="\n", maxsplit=1)
split_lines[0] = " ".join(
[word for word in split_lines[0].split(" ") if word not in self.flags]
@@ -113,8 +113,8 @@ class Message(Msg):
async def extract_user_n_reason(self) -> tuple[User | str | Exception, str | None]:
if self.replied:
- return self.replied.from_user, self.flt_input
- input_text_list = self.flt_input.split(maxsplit=1)
+ return self.replied.from_user, self.filtered_input
+ input_text_list = self.filtered_input.split(maxsplit=1)
if not input_text_list:
return (
"Unable to Extract User info.\nReply to a user or input @ | id.",
diff --git a/app/plugins/admin/fbans.py b/app/plugins/admin/fbans.py
index 0365cfd..720d37b 100644
--- a/app/plugins/admin/fbans.py
+++ b/app/plugins/admin/fbans.py
@@ -7,7 +7,7 @@ from pyrogram.types import Chat, User
from app import BOT, Config, CustomDB, Message, bot
from app.utils.helpers import get_name
-DB = CustomDB("FED_LIST")
+FED_DB = CustomDB("FED_LIST")
BASIC_FILTER = filters.user([609517172, 2059887769]) & ~filters.service
@@ -34,7 +34,7 @@ async def add_fed(bot: BOT, message: Message):
.addf | .addf NAME
"""
data = dict(name=message.input or message.chat.title, type=str(message.chat.type))
- await DB.add_data({"_id": message.chat.id, **data})
+ await FED_DB.add_data({"_id": message.chat.id, **data})
text = f"#FBANS\n{data['name']}: {message.chat.id} added to FED LIST."
await message.reply(
text=text,
@@ -54,7 +54,7 @@ async def remove_fed(bot: BOT, message: Message):
.delf | .delf id | .delf -all
"""
if "-all" in message.flags:
- await DB.drop()
+ await FED_DB.drop()
await message.reply("FED LIST cleared.")
return
chat: int | str | Chat = message.input or message.chat
@@ -64,144 +64,16 @@ async def remove_fed(bot: BOT, message: Message):
chat = chat.id
elif chat.lstrip("-").isdigit():
chat = int(chat)
- deleted: bool | None = await DB.delete_data(id=chat)
+ deleted: bool | None = await FED_DB.delete_data(id=chat)
if deleted:
text = f"#FBANS\n{name}{chat} removed from FED LIST."
await message.reply(
text=text,
del_in=8,
- block=True,
)
await bot.log_text(text=text, type="info")
else:
- await message.reply(f"{name or chat} not in FED LIST.", del_in=8)
-
-
-@bot.add_cmd(cmd=["fban", "fbanp"])
-async def fed_ban(bot: BOT, message: Message):
- progress: Message = await message.reply("❯")
- user, reason = await message.extract_user_n_reason()
- if isinstance(user, str):
- await progress.edit(user)
- return
- if not isinstance(user, User):
- user_id = user
- user_mention = f"{user_id}"
- else:
- user_id = user.id
- user_mention = user.mention
- if user_id in [Config.OWNER_ID, *Config.SUDO_USERS, *Config.SUDO_USERS]:
- await progress.edit("Cannot Fban Owner/Sudo users.")
- return
- proof_str: str = ""
- if message.cmd == "fbanp":
- if not message.replied:
- await message.reply("Reply to a proof")
- return
- proof = await message.replied.forward(Config.FBAN_LOG_CHANNEL)
- proof_str = f"\n{ {proof.link} }"
-
- reason = f"{reason}{proof_str}"
-
- if message.replied and not message.chat.type == ChatType.PRIVATE:
- me = await bot.get_chat_member(chat_id=message.chat.id, user_id="me")
- if me.status in {ChatMemberStatus.OWNER, ChatMemberStatus.ADMINISTRATOR}:
- await message.replied.reply(
- text=f"!dban {reason}",
- disable_web_page_preview=True,
- del_in=3,
- block=False,
- )
-
- await progress.edit("❯❯")
- total: int = 0
- failed: list[str] = []
- fban_cmd: str = f"/fban {user_id} {reason}"
- async for fed in DB.find():
- chat_id = int(fed["_id"])
- total += 1
- cmd: Message = await bot.send_message(
- chat_id=chat_id, text=fban_cmd, disable_web_page_preview=True
- )
- response: Message | None = await cmd.get_response(
- filters=BASIC_FILTER, timeout=8
- )
- if not response or not (await FBAN_REGEX(bot, response)): # NOQA
- failed.append(fed["name"])
- elif "Would you like to update this reason" in response.text:
- await response.click("Update reason")
- await asyncio.sleep(1)
- if not total:
- await progress.edit("You Don't have any feds connected!")
- return
- resp_str = (
- f"❯❯❯ FBanned {user_mention}"
- f"\nID: {user_id}"
- f"\nReason: {reason}"
- f"\nInitiated in: {message.chat.title or 'PM'}"
- )
- if failed:
- resp_str += f"\nFailed in: {len(failed)}/{total}\n• " + "\n• ".join(
- failed
- )
- else:
- resp_str += f"\nStatus: Fbanned in {total} feds."
- if not message.is_from_owner:
- resp_str += f"\n\nBy: {get_name(message.from_user)}"
- await bot.send_message(
- chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
- )
- await progress.edit(
- text=resp_str, del_in=5, block=True, disable_web_page_preview=True
- )
-
-
-@bot.add_cmd(cmd="unfban")
-async def un_fban(bot: BOT, message: Message):
- progress: Message = await message.reply("❯")
- user, reason = await message.extract_user_n_reason()
- if isinstance(user, str):
- await progress.edit(user)
- return
- if not isinstance(user, User):
- user_id = user
- user_mention = f"{user_id}"
- else:
- user_id = user.id
- user_mention = user.mention
-
- await progress.edit("❯❯")
- total: int = 0
- failed: list[str] = []
- unfban_cmd: str = f"/unfban {user_id} {reason}"
- async for fed in DB.find():
- chat_id = int(fed["_id"])
- total += 1
- cmd: Message = await bot.send_message(
- chat_id=chat_id, text=unfban_cmd, disable_web_page_preview=True
- )
- response: Message | None = await cmd.get_response(
- filters=BASIC_FILTER, timeout=8
- )
- if not response or not (await UNFBAN_REGEX(bot, response)):
- failed.append(fed["name"])
- await asyncio.sleep(1)
- if not total:
- await progress.edit("You Don't have any feds connected!")
- return
- resp_str = (
- f"❯❯❯ Un-FBanned {user_mention}" f"\nID: {user_id}" f"\nReason: {reason}\n"
- )
- if failed:
- resp_str += f"Failed in: {len(failed)}/{total}\n• " + "\n• ".join(failed)
- else:
- resp_str += f"Success! Un-Fbanned in {total} feds."
- await bot.send_message(
- chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
- )
- await progress.edit(
- text=resp_str, del_in=8, block=False, disable_web_page_preview=True
- )
+ await message.reply(text=f"{name or chat} not in FED LIST.", del_in=8)
@bot.add_cmd(cmd="listf")
@@ -214,7 +86,7 @@ async def fed_list(bot: BOT, message: Message):
"""
output: str = ""
total = 0
- async for fed in DB.find():
+ async for fed in FED_DB.find():
output += f'• {fed["name"]}\n'
if "-id" in message.flags:
output += f' {fed["_id"]}\n'
@@ -224,3 +96,151 @@ async def fed_list(bot: BOT, message: Message):
return
output: str = f"List of {total} Connected Feds:\n\n{output}"
await message.reply(output, del_in=30, block=True)
+
+
+@bot.add_cmd(cmd=["fban", "fbanp"])
+async def fed_ban(bot: BOT, message: Message):
+ progress: Message = await message.reply("❯")
+ extracted_info = await get_user_reason(message=message, progress=progress)
+ if not extracted_info:
+ return
+
+ user_id, user_mention, reason = extracted_info
+
+ if user_id in [Config.OWNER_ID, *Config.SUPERUSERS, *Config.SUDO_USERS]:
+ await progress.edit("Cannot Fban Owner/Sudo users.")
+ return
+
+ proof_str: str = ""
+ if message.cmd == "fbanp":
+ if not message.replied:
+ await progress.edit("Reply to a proof")
+ return
+ proof = await message.replied.forward(Config.FBAN_LOG_CHANNEL)
+ proof_str = f"\n{ {proof.link} }"
+
+ reason = f"{reason}{proof_str}"
+
+ if message.replied and message.chat.type != ChatType.PRIVATE:
+ me = await bot.get_chat_member(chat_id=message.chat.id, user_id="me")
+ if me.status in {ChatMemberStatus.OWNER, ChatMemberStatus.ADMINISTRATOR}:
+ await message.replied.reply(
+ text=f"!dban {reason}",
+ disable_web_page_preview=True,
+ del_in=3,
+ block=False,
+ )
+
+ fban_cmd: str = f"/fban {user_id} {reason}"
+
+ await perform_fed_task(
+ user_id=user_id,
+ user_mention=user_mention,
+ command=fban_cmd,
+ task_filter=FBAN_REGEX,
+ task_type="Fban",
+ reason=reason,
+ progress=progress,
+ message=message,
+ )
+
+
+@bot.add_cmd(cmd="unfban")
+async def un_fban(bot: BOT, message: Message):
+ progress: Message = await message.reply("❯")
+ extracted_info = await get_user_reason(message=message, progress=progress)
+
+ if not extracted_info:
+ return
+
+ user_id, user_mention, reason = extracted_info
+ unfban_cmd: str = f"/unfban {user_id} {reason}"
+
+ await perform_fed_task(
+ user_id=user_id,
+ user_mention=user_mention,
+ command=unfban_cmd,
+ task_filter=UNFBAN_REGEX,
+ task_type="Un-FBan",
+ reason=reason,
+ progress=progress,
+ message=message,
+ )
+
+
+async def get_user_reason(
+ message: Message, progress: Message
+) -> tuple[int, str, str] | None:
+ user, reason = await message.extract_user_n_reason()
+ if isinstance(user, str):
+ await progress.edit(user)
+ return
+ if not isinstance(user, User):
+ user_id = user
+ user_mention = f"{user_id}"
+ else:
+ user_id = user.id
+ user_mention = user.mention
+ return user_id, user_mention, reason
+
+
+async def perform_fed_task(
+ user_id: int,
+ user_mention: str,
+ command: str,
+ task_filter: filters.Filter,
+ task_type: str,
+ reason: str,
+ progress: Message,
+ message: Message,
+):
+ await progress.edit("❯❯")
+ total: int = 0
+ failed: list[str] = []
+ async for fed in FED_DB.find():
+ chat_id = int(fed["_id"])
+ total += 1
+ cmd: Message = await bot.send_message(
+ chat_id=chat_id, text=command, disable_web_page_preview=True
+ )
+ response: Message | None = await cmd.get_response(
+ filters=BASIC_FILTER, timeout=8
+ )
+ if not response or not (await task_filter(bot, response)): # NOQA
+ failed.append(fed["name"])
+ elif "Would you like to update this reason" in response.text:
+ await response.click("Update reason")
+ await asyncio.sleep(1)
+ if not total:
+ await progress.edit("You Don't have any feds connected!")
+ return
+ resp_str = (
+ f"❯❯❯ {task_type}ned {user_mention}"
+ f"\nID: {user_id}"
+ f"\nReason: {reason}"
+ f"\nInitiated in: {message.chat.title or 'PM'}"
+ )
+ if failed:
+ resp_str += f"\nFailed in: {len(failed)}/{total}\n• " + "\n• ".join(
+ failed
+ )
+ else:
+ resp_str += f"\nStatus: {task_type}ned in {total} feds."
+ if not message.is_from_owner:
+ resp_str += f"\n\nBy: {get_name(message.from_user)}"
+ await bot.send_message(
+ chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
+ )
+ await progress.edit(
+ text=resp_str, del_in=5, block=True, disable_web_page_preview=True
+ )
+ await handle_sudo_fban(command=command)
+
+
+async def handle_sudo_fban(command: str):
+ if not (Config.FBAN_SUDO_ID and Config.FBAN_SUDO_TRIGGER):
+ return
+ sudo_cmd = command.replace("/", Config.FBAN_SUDO_TRIGGER, 1)
+ await bot.send_message(
+ chat_id=Config.FBAN_SUDO_ID, text=sudo_cmd, disable_web_page_preview=True
+ )
diff --git a/app/plugins/dev/exec.py b/app/plugins/dev/exec.py
index 6fae9ff..6ae0b0b 100644
--- a/app/plugins/dev/exec.py
+++ b/app/plugins/dev/exec.py
@@ -19,7 +19,7 @@ async def executor(bot: BOT, message: Message) -> Message | None:
USAGE:
.py [-s] return 1
"""
- code: str = message.flt_input.strip()
+ code: str = message.filtered_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply: Message = await message.reply("executing")
@@ -61,6 +61,6 @@ if Config.DEV_MODE:
Config.CMD_DICT["py"] = Config.CMD(
cmd="py",
func=executor,
- path=inspect.stack()[0][1],
+ cmd_path=inspect.stack()[0][1],
sudo=False,
)
diff --git a/app/plugins/dev/loader.py b/app/plugins/dev/loader.py
index de93380..cb402b5 100644
--- a/app/plugins/dev/loader.py
+++ b/app/plugins/dev/loader.py
@@ -16,7 +16,7 @@ async def loader(bot: BOT, message: Message) -> Message | None:
await message.reply("Reply to a Plugin.")
return
if "-r" in message.flags:
- plugin = message.flt_input
+ plugin = message.filtered_input
cmd_module = Config.CMD_DICT.get(plugin)
if not cmd_module:
await message.reply(text="Invalid cmd.")
@@ -41,6 +41,6 @@ if Config.DEV_MODE:
Config.CMD_DICT["load"] = Config.CMD(
cmd="load",
func=loader,
- path=inspect.stack()[0][1],
+ cmd_path=inspect.stack()[0][1],
sudo=False,
)
diff --git a/app/plugins/dev/shell.py b/app/plugins/dev/shell.py
index b3deae8..a31b2ff 100644
--- a/app/plugins/dev/shell.py
+++ b/app/plugins/dev/shell.py
@@ -54,12 +54,12 @@ if Config.DEV_MODE:
Config.CMD_DICT["shell"] = Config.CMD(
cmd="shell",
func=live_shell,
- path=inspect.stack()[0][1],
+ cmd_path=inspect.stack()[0][1],
sudo=False,
)
Config.CMD_DICT["sh"] = Config.CMD(
cmd="sh",
func=run_cmd,
- path=inspect.stack()[0][1],
+ cmd_path=inspect.stack()[0][1],
sudo=False,
)
diff --git a/app/plugins/files/download.py b/app/plugins/files/download.py
index 956a009..7ada02f 100644
--- a/app/plugins/files/download.py
+++ b/app/plugins/files/download.py
@@ -30,7 +30,7 @@ async def down_load(bot: BOT, message: Message):
file_name = None
if message.replied and message.replied.media:
if "-f" in message.flags:
- file_name = message.flt_input
+ file_name = message.filtered_input
download_coro = telegram_download(
message=message.replied,
response=response,
@@ -39,9 +39,9 @@ async def down_load(bot: BOT, message: Message):
)
else:
if "-f" in message.flags:
- file_name, url = message.flt_input.split(maxsplit=1)
+ file_name, url = message.filtered_input.split(maxsplit=1)
else:
- url = message.flt_input
+ url = message.filtered_input
dl_obj: Download = await Download.setup(
url=url, path=dl_path, message_to_edit=response, custom_file_name=file_name
)
diff --git a/app/plugins/files/rename.py b/app/plugins/files/rename.py
index 81e9ee7..37061b3 100644
--- a/app/plugins/files/rename.py
+++ b/app/plugins/files/rename.py
@@ -20,9 +20,9 @@ async def rename(bot: BOT, message: Message):
USAGE:
.rename [ url | reply to message ] file_name.ext
"""
- input = message.flt_input
+ input = message.filtered_input
response = await message.reply("Checking input...")
- if not message.replied or not message.replied.media or not message.flt_input:
+ if not message.replied or not message.replied.media or not message.filtered_input:
await response.edit(
"Invalid input...\nReply to a message containing media or give a link and a filename with cmd."
)
diff --git a/app/plugins/files/upload.py b/app/plugins/files/upload.py
index 45b8600..aa01dd1 100644
--- a/app/plugins/files/upload.py
+++ b/app/plugins/files/upload.py
@@ -89,13 +89,13 @@ async def upload(bot: BOT, message: Message):
USAGE:
.upload [-d] URL | Path to File | CMD
"""
- input = message.flt_input
+ input = message.filtered_input
if not input:
await message.reply("give a file url | path to upload.")
return
response = await message.reply("checking input...")
if input in Config.CMD_DICT:
- await message.reply_document(document=Config.CMD_DICT[input].path)
+ await message.reply_document(document=Config.CMD_DICT[input].cmd_path)
await response.delete()
return
elif input.startswith("http") and not file_check(input):
diff --git a/app/plugins/misc/debrid.py b/app/plugins/misc/debrid.py
deleted file mode 100644
index 2a3a3a9..0000000
--- a/app/plugins/misc/debrid.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# AllDebrid API plugin By Ryuk
-
-import os
-
-from app import BOT, Message, bot
-from app.utils.aiohttp_tools import aio
-from app.utils.helpers import post_to_telegraph as post_tgh
-
-
-# Get response from api and return json or the error
-async def get_json(endpoint: str, query: dict, key=os.environ.get("DEBRID_TOKEN")):
- if not key:
- return "API key not found."
- api = "https://api.alldebrid.com/v4" + endpoint
- params = {"agent": "bot", "apikey": key, **query}
- async with aio.session.get(url=api, params=params) as ses:
- try:
- json = await ses.json()
- return json
- except Exception as e:
- return str(e)
-
-
-# Unlock Links or magnets
-@bot.add_cmd("unrestrict")
-async def debrid(bot: BOT, message: Message):
- if not message.flt_input:
- return await message.reply("Give a magnet or link to unrestrict.")
- for i in message.text_list[1:]:
- link = i
- if link.startswith("http"):
- if "-save" not in message.flags:
- endpoint = "/link/unlock"
- query = {"link": link}
- else:
- endpoint = "/user/links/save"
- query = {"links[]": link}
- else:
- endpoint = "/magnet/upload"
- query = {"magnets[]": link}
- unrestrict = await get_json(endpoint=endpoint, query=query)
- if not isinstance(unrestrict, dict) or "error" in unrestrict:
- await message.reply(unrestrict)
- continue
- if "-save" in message.flags:
- await message.reply("Link Successfully Saved.")
- continue
- if not link.startswith("http"):
- data = unrestrict["data"]["magnets"][0]
- else:
- data = unrestrict["data"]
- name = data.get("filename", data.get("name", ""))
- id = data.get("id")
- size = round(int(data.get("size", data.get("filesize", 0))) / 1000000)
- ready = data.get("ready", "True")
- ret_str = (
- f"""Name: **{name}**\nID: `{id}`\nSize: **{size} mb**\nReady: __{ready}__"""
- )
- await message.reply(ret_str)
-
-
-# Get Status via id or Last 5 torrents
-@bot.add_cmd("torrents")
-async def torrents(bot: BOT, message: Message):
- endpoint = "/magnet/status"
- query = {}
-
- if "-s" in message.flags and "-l" in message.flags:
- return await message.reply("can't use two flags at once")
-
- if "-s" in message.flags:
- if not (input_ := message.flt_input):
- return await message.reply("ID required with -s flag")
- query = {"id": input_}
-
- json = await get_json(endpoint=endpoint, query=query)
-
- if not isinstance(json, dict) or "error" in json:
- return await message.reply(json)
-
- data = json["data"]["magnets"]
-
- if not isinstance(data, list):
- data = [data]
-
- ret_str_list = []
- limit = 1
- if "-l" in message.flags:
- limit = int(message.flt_input)
-
- for i in data[0:limit]:
- status = i.get("status")
- name = i.get("filename")
- id = i.get("id")
- downloaded = ""
- uptobox = ""
- if status == "Downloading":
- downloaded = f"""{round(int(i.get("downloaded",0))/1000000)}/"""
- size = f"""{downloaded}{round(int(i.get("size",0))/1000000)} mb"""
- if link := i.get("links"):
- uptobox = (
- "UptoBox: \n[ "
- + "\n".join(
- [
- f"""{z.get("filename","")}"""
- for z in link
- ]
- )
- + " ]"
- )
- ret_str_list.append(
- f"\nName: {name}"
- f"\nStatus: {status}"
- f"\nID: {id}"
- f"\nSize: {size}"
- f"\n{uptobox}"
- )
-
- ret_str = "
".join(ret_str_list)
- if len(ret_str) < 4096:
- await message.reply(ret_str)
- else:
- await message.reply(
- (await post_tgh("Magnets", ret_str.replace("\n", "
"))),
- disable_web_page_preview=True,
- )
-
-
-# Delete a Magnet
-@bot.add_cmd("del_t")
-async def delete_torrent(bot: BOT, message: Message):
- endpoint = "/magnet/delete"
- if not (id := message.flt_input):
- return await message.reply("Enter an ID to delete")
- for i in message.text_list[1:]:
- json = await get_json(endpoint=endpoint, query={"id": i})
- await message.reply(str(json))
diff --git a/app/plugins/misc/song.py b/app/plugins/misc/song.py
index 0a2d6ac..7f430f4 100644
--- a/app/plugins/misc/song.py
+++ b/app/plugins/misc/song.py
@@ -38,7 +38,7 @@ async def song_dl(bot: bot, message: Message) -> None | Message:
if urlparse(link).netloc in domains:
reply_query = link
break
- query = reply_query or message.flt_input
+ query = reply_query or message.filtered_input
if not query:
await message.reply("Give a song name or link to download.")
return
diff --git a/app/plugins/sudo/commands.py b/app/plugins/sudo/commands.py
index 349fe63..e271862 100644
--- a/app/plugins/sudo/commands.py
+++ b/app/plugins/sudo/commands.py
@@ -4,7 +4,10 @@ DB = CustomDB("SUDO_CMD_LIST")
async def init_task():
- Config.SUDO_CMD_LIST = [sudo_cmd["_id"] async for sudo_cmd in DB.find()]
+ async for sudo_cmd in DB.find():
+ cmd_object = Config.CMD_DICT.get(sudo_cmd["_id"])
+ if cmd_object:
+ cmd_object.loaded = True
@bot.add_cmd(cmd="addscmd", allow_sudo=False)
@@ -18,32 +21,34 @@ async def add_scmd(bot: BOT, message: Message):
"""
if "-all" in message.flags:
cmds = []
- for cmd, func in Config.CMD_DICT.items():
- if func.sudo:
- Config.SUDO_CMD_LIST.append(cmd)
- cmds.append({"_id": cmd})
+ for cmd_name, cmd_object in Config.CMD_DICT.items():
+ if cmd_object.sudo:
+ cmd_object.loaded = True
+ cmds.append({"_id": cmd_name})
await DB.drop()
await DB.insert_many(cmds)
await (await message.reply("All Commands Added to Sudo!")).log()
return
- cmd = message.flt_input
- response = await message.reply(f"Adding {cmd} to sudo....")
- func = Config.CMD_DICT.get(cmd)
- if not func:
- await response.edit(text=f"{cmd} not a valid command.", del_in=10)
+ cmd_name = message.filtered_input
+ response = await message.reply(f"Adding {cmd_name} to sudo....")
+ cmd_object = Config.CMD_DICT.get(cmd_name)
+ if not cmd_object:
+ await response.edit(text=f"{cmd_name} not a valid command.", del_in=10)
return
- elif not func.sudo:
- await response.edit(text=f"{cmd} is disabled for sudo users.", del_in=10)
+ elif not cmd_object.sudo:
+ await response.edit(
+ text=f"{cmd_name} is disabled for sudo users.", del_in=10
+ )
return
- elif cmd in Config.SUDO_CMD_LIST:
- await response.edit(text=f"{cmd} already in Sudo!", del_in=10)
+ elif cmd_object.loaded:
+ await response.edit(text=f"{cmd_name} already in Sudo!", del_in=10)
return
- resp_str = f"#SUDO\n{cmd} added to Sudo!"
+ resp_str = f"#SUDO\n{cmd_name} added to Sudo!"
if "-temp" in message.flags:
resp_str += "\nTemp: True"
else:
- await DB.add_data(data={"_id": cmd})
- Config.SUDO_CMD_LIST.append(cmd)
+ await DB.add_data(data={"_id": cmd_name})
+ cmd_object.loaded = True
await (await response.edit(resp_str)).log()
@@ -57,32 +62,36 @@ async def del_scmd(bot: BOT, message: Message):
.delscmd ping | .delscmd -all
"""
if "-all" in message.flags:
- Config.SUDO_CMD_LIST = []
+ for cmd_object in Config.CMD_DICT.values():
+ cmd_object.loaded = False
await DB.drop()
await (await message.reply("All Commands Removed from Sudo!")).log()
return
- cmd = message.flt_input
- response = await message.reply(f"Removing {cmd} from sudo....")
- if cmd not in Config.SUDO_CMD_LIST:
- await response.edit(f"{cmd} not in Sudo!")
+ cmd_name = message.filtered_input
+ cmd_object = Config.CMD_DICT.get(cmd_name)
+ if not cmd_object:
return
- Config.SUDO_CMD_LIST.remove(cmd)
- resp_str = f"#SUDO\n{cmd} removed from Sudo!"
+ response = await message.reply(f"Removing {cmd_name} from sudo....")
+ if not cmd_object.loaded:
+ await response.edit(f"{cmd_name} not in Sudo!")
+ return
+ cmd_object.loaded = False
+ resp_str = f"#SUDO\n{cmd_name} removed from Sudo!"
if "-temp" in message.flags:
resp_str += "\nTemp: True"
else:
- await DB.delete_data(cmd)
+ await DB.delete_data(cmd_name)
await (await response.edit(resp_str)).log()
@bot.add_cmd(cmd="vscmd")
async def view_sudo_cmd(bot: BOT, message: Message):
- cmds = " ".join(Config.SUDO_CMD_LIST)
+ cmds = [cmd_name for cmd_name, cmd_obj in Config.CMD_DICT if cmd_obj.loaded]
if not cmds:
await message.reply("No Commands in SUDO!")
return
await message.reply(
- text=f"List of {len(Config.SUDO_CMD_LIST)} SUDO CMDS:\n\n{cmds}",
+ text=f"List of {len(cmds)}:\n {cmds}",
del_in=30,
block=False,
)
diff --git a/app/plugins/sys_utils/cmdinfo.py b/app/plugins/sys_utils/cmdinfo.py
index 5a0db76..5820a8b 100644
--- a/app/plugins/sys_utils/cmdinfo.py
+++ b/app/plugins/sys_utils/cmdinfo.py
@@ -10,11 +10,11 @@ async def cmd_info(bot: BOT, message: Message):
INFO: Get Github File URL of a Command.
USAGE: .ci ci
"""
- cmd = message.flt_input
+ cmd = message.filtered_input
if not cmd or cmd not in Config.CMD_DICT.keys():
await message.reply("Give a valid cmd.", del_in=5)
return
- cmd_path = Config.CMD_DICT[cmd].path
+ cmd_path = Config.CMD_DICT[cmd].cmd_path
plugin_path = os.path.relpath(cmd_path, os.curdir)
repo = Config.REPO.remotes.origin.url
branch = Config.REPO.active_branch
@@ -25,3 +25,15 @@ async def cmd_info(bot: BOT, message: Message):
f"\nLink: Github"
)
await message.reply(resp_str, disable_web_page_preview=True)
+
+
+@bot.add_cmd(cmd="s")
+async def search(bot: BOT, message: Message):
+ search_str = message.input
+
+ if not search_str:
+ await message.reply("Give some input to search commands.")
+ return
+
+ cmds = [cmd for cmd in Config.CMD_DICT.keys() if search_str in cmd]
+ await message.reply(f"{cmds}")
diff --git a/app/plugins/sys_utils/help.py b/app/plugins/sys_utils/help.py
index e5018cc..795c049 100644
--- a/app/plugins/sys_utils/help.py
+++ b/app/plugins/sys_utils/help.py
@@ -34,5 +34,5 @@ def get_cmds() -> str:
help_str = ""
for key in sorted_keys:
help_str += f"\n\n\n{key.capitalize()}:\n"
- help_str += " ".join([f"{cmd}" for cmd in dir_dict[key]])
+ help_str += f"{dir_dict[key]}"
return help_str
diff --git a/app/plugins/tg_tools/delete.py b/app/plugins/tg_tools/delete.py
index 03f1275..9856c23 100644
--- a/app/plugins/tg_tools/delete.py
+++ b/app/plugins/tg_tools/delete.py
@@ -13,7 +13,7 @@ async def delete_message(bot: BOT, message: Message) -> None:
.del | .del -r t.me/......
"""
if "-r" in message.flags:
- chat_id, message_id = parse_link(message.flt_input)
+ chat_id, message_id = parse_link(message.filtered_input)
await bot.delete_messages(chat_id=chat_id, message_ids=message_id, revoke=True)
return
await message.delete(reply=True)
diff --git a/app/plugins/tg_tools/kang.py b/app/plugins/tg_tools/kang.py
index dae9698..a17107d 100644
--- a/app/plugins/tg_tools/kang.py
+++ b/app/plugins/tg_tools/kang.py
@@ -30,6 +30,9 @@ async def kang_sticker(bot: BOT, message: Message):
response = await message.reply("Checking input")
media_coro = get_sticker_media_coro(message)
+ if not media_coro:
+ await response.edit("Unsupported Media.")
+ return
kwargs: dict = await media_coro
pack_title, pack_name, create_new = await get_sticker_set(
limit=kwargs["limit"], is_video=kwargs["is_video"]
@@ -135,14 +138,14 @@ def get_sticker_media_coro(message: Message):
async def photo_kang(message: Message) -> dict:
- down_dir = os.path.join("downloads", str(time.time()))
- os.makedirs(down_dir, exist_ok=True)
- input_file = os.path.join(down_dir, "photo.jpg")
+ download_path = os.path.join("downloads", str(time.time()))
+ os.makedirs(download_path, exist_ok=True)
+ input_file = os.path.join(download_path, "photo.jpg")
await message.download(input_file)
file = await asyncio.to_thread(resize_photo, input_file)
- limit = 120
- cmd = "/newpack"
- return dict(cmd=cmd, limit=limit, is_video=False, file=file, path=down_dir)
+ return dict(
+ cmd="/newpack", limit=120, is_video=False, file=file, path=download_path
+ )
def resize_photo(input_file: str) -> BytesIO:
@@ -158,46 +161,42 @@ def resize_photo(input_file: str) -> BytesIO:
async def video_kang(message: Message, ff=False) -> dict:
- vid = message.video or message.animation or message.document
- if vid.file_size > 5242880:
+ video = message.video or message.animation or message.document
+ if video.file_size > 5242880:
raise MemoryError("File Size exceeds 5MB.")
- down_dir = os.path.join("downloads", f"{time.time()}")
- os.makedirs(down_dir, exist_ok=True)
- input_file = os.path.join(down_dir, "input.mp4")
- output_file = os.path.join(down_dir, "sticker.webm")
+ download_path = os.path.join("downloads", f"{time.time()}")
+ os.makedirs(download_path, exist_ok=True)
+ input_file = os.path.join(download_path, "input.mp4")
+ output_file = os.path.join(download_path, "sticker.webm")
await message.download(input_file)
- if not hasattr(vid, "duration"):
+ if not hasattr(video, "duration"):
duration = await get_duration(file=input_file)
else:
- duration = vid.duration
+ duration = video.duration
await resize_video(
input_file=input_file, output_file=output_file, duration=duration, ff=ff
)
- cmd = "/newvideo"
- limit = 50
- is_video = True
return dict(
- cmd=cmd, limit=limit, is_video=is_video, file=output_file, path=down_dir
+ cmd="/newvideo", limit=50, is_video=True, file=output_file, path=download_path
)
async def resize_video(
input_file: str, output_file: str, duration: int, ff: bool = False
):
- cmd = f"ffmpeg -hide_banner -loglevel error -i {input_file} -vf"
+ cmd = f"ffmpeg -hide_banner -loglevel error -i '{input_file}' -vf "
if ff:
cmd += (
- ' "scale=w=512:h=512:force_original_aspect_ratio=decrease,setpts=0.3*PTS" '
+ '"scale=w=512:h=512:force_original_aspect_ratio=decrease,setpts=0.3*PTS" '
)
cmd += "-ss 0 -t 3 -r 30 -loop 0 -an -c:v libvpx-vp9 -b:v 256k -fs 256k "
elif duration < 3:
- cmd += ' "scale=w=512:h=512:force_original_aspect_ratio=decrease" '
+ cmd += '"scale=w=512:h=512:force_original_aspect_ratio=decrease" '
cmd += "-ss 0 -r 30 -an -c:v libvpx-vp9 -b:v 256k -fs 256k "
else:
- cmd += ' "scale=w=512:h=512:force_original_aspect_ratio=decrease" '
+ cmd += '"scale=w=512:h=512:force_original_aspect_ratio=decrease" '
cmd += "-ss 0 -t 3 -r 30 -an -c:v libvpx-vp9 -b:v 256k -fs 256k "
- cmd += output_file
- await run_shell_cmd(cmd=cmd)
+ await run_shell_cmd(cmd=f"{cmd}'{output_file}'")
async def document_kang(message: Message, ff: bool = False) -> dict:
diff --git a/app/plugins/tg_tools/pm_n_tag_logger.py b/app/plugins/tg_tools/pm_n_tag_logger.py
index c43c8cf..3026373 100644
--- a/app/plugins/tg_tools/pm_n_tag_logger.py
+++ b/app/plugins/tg_tools/pm_n_tag_logger.py
@@ -6,6 +6,7 @@ from pyrogram.enums import ChatType, MessageEntityType
from pyrogram.errors import MessageIdInvalid
from app import BOT, Config, CustomDB, Message, bot
+from app.utils.helpers import get_name
LOGGER = CustomDB("COMMON_SETTINGS")
@@ -118,7 +119,7 @@ async def username_logger(bot: BOT, message: Message):
def cache_message(message: Message):
chat_id = message.chat.id
if len(MESSAGE_CACHE[chat_id]) >= 10 and chat_id not in FLOOD_LIST:
- bot.log.error("PM or Tag Flood detected, Message not Logged.")
+ bot.log.error(f"Message not Logged from chat: {get_name(message.chat)}")
FLOOD_LIST.append(chat_id)
return
if chat_id in FLOOD_LIST:
diff --git a/app/plugins/tg_tools/pm_permit.py b/app/plugins/tg_tools/pm_permit.py
index b94022f..13f82ea 100644
--- a/app/plugins/tg_tools/pm_permit.py
+++ b/app/plugins/tg_tools/pm_permit.py
@@ -29,7 +29,8 @@ async def init_task():
@bot.on_message(
(guard_check & filters.private & filters.incoming)
& (~allowed_filter & ~filters.bot)
- & ~filters.chat(chats=[bot.me.id]),
+ & ~filters.chat(chats=[bot.me.id])
+ & ~filters.service,
group=0,
)
async def handle_new_pm(bot: BOT, message: Message):
@@ -135,8 +136,8 @@ async def no_pm(bot: BOT, message: Message):
def get_userID_name(message: Message) -> tuple:
- if message.flt_input and message.flt_input.isdigit():
- user_id = int(message.flt_input)
+ if message.filtered_input and message.filtered_input.isdigit():
+ user_id = int(message.filtered_input)
return user_id, user_id
elif message.replied:
return message.replied.from_user.id, get_name(message.replied.from_user)
diff --git a/app/plugins/tg_tools/reply.py b/app/plugins/tg_tools/reply.py
index 7862fad..60341c1 100644
--- a/app/plugins/tg_tools/reply.py
+++ b/app/plugins/tg_tools/reply.py
@@ -13,7 +13,7 @@ async def reply(bot: BOT, message: Message) -> None:
.reply HI | .reply -r t.me/... HI
"""
if "-r" in message.flags:
- input: list[str] = message.flt_input.split(" ", maxsplit=1)
+ input: list[str] = message.filtered_input.split(" ", maxsplit=1)
if len(input) < 2:
await message.reply("The '-r' flag requires a message link and text.")
return
diff --git a/app/utils/helpers.py b/app/utils/helpers.py
index 25ae2d7..f8e2a02 100644
--- a/app/utils/helpers.py
+++ b/app/utils/helpers.py
@@ -34,12 +34,12 @@ async def post_to_telegraph(title: str, text: str):
return telegraph["url"]
-def get_name(user: User | Chat) -> str:
- first = user.first_name or ""
- last = user.last_name or ""
+def get_name(user_or_chat: User | Chat) -> str:
+ first = user_or_chat.first_name or ""
+ last = user_or_chat.last_name or ""
name = f"{first} {last}".strip()
if not name:
- return user.title
+ return user_or_chat.title
def extract_user_data(user: User) -> dict:
diff --git a/sample-config.env b/sample-config.env
index cb73c67..df7da58 100644
--- a/sample-config.env
+++ b/sample-config.env
@@ -4,7 +4,7 @@ API_ID=
API_HASH=
-API_PORT=
+# API_PORT=
# To pass Health checks of Koyeb, Render and other hosts.
# Use the port listed in your app configuration.
@@ -20,15 +20,20 @@ DB_URL=
# Mongo DB cluster URL
-FBAN_LOG_CHANNEL=
+# FBAN_LOG_CHANNEL=
# Optional FedBan Proof and logs.
-GEMINI_API_KEY=
+
+# FBAN_SUDO_ID=
+# FBAN_SUDO_TRIGGER=
+# Optional sudo fban vars to initiate ban in 2nd user-bot.
+
+
+# GEMINI_API_KEY=
# Optional API Key
# Get from https://ai.google.dev/
-
LOG_CHAT=
# Bot logs chat/channel
@@ -45,9 +50,9 @@ SUDO_TRIGGER=!
# Sudo Trigger for bot
-MESSAGE_LOGGER_CHAT=
+# MESSAGE_LOGGER_CHAT=
# PM and Tag Logger chat.
UPSTREAM_REPO=https://github.com/thedragonsinn/plain-ub
-# Keep default unless you wanna maintain your fork.
+# Keep default unless you maintain your own fork.