fixes:fbans, trigger, exec -s;new: kick
This commit is contained in:
@@ -12,12 +12,19 @@ def cmd_check(message: Message, trigger: str, sudo: bool = False) -> bool:
|
||||
return bool(cmd in Config.CMD_DICT)
|
||||
|
||||
|
||||
def owner_check(filter, client, message: Message) -> bool:
|
||||
def basic_check(message: Message):
|
||||
if (
|
||||
message.reactions
|
||||
or not message.text
|
||||
or not message.text.startswith(Config.CMD_TRIGGER)
|
||||
or not message.from_user
|
||||
):
|
||||
return True
|
||||
|
||||
|
||||
def owner_check(filter, client, message: Message) -> bool:
|
||||
if (
|
||||
basic_check(message)
|
||||
or message.from_user.id != Config.OWNER_ID
|
||||
or (message.chat.id != Config.OWNER_ID and not message.outgoing)
|
||||
):
|
||||
@@ -29,10 +36,8 @@ def owner_check(filter, client, message: Message) -> bool:
|
||||
def sudo_check(filter, client, message: Message) -> bool:
|
||||
if (
|
||||
not Config.SUDO
|
||||
or message.reactions
|
||||
or not message.text
|
||||
or basic_check(message)
|
||||
or not message.text.startswith(Config.SUDO_TRIGGER)
|
||||
or not message.from_user
|
||||
or message.from_user.id not in Config.SUDO_USERS
|
||||
):
|
||||
return False
|
||||
|
||||
@@ -12,7 +12,9 @@ from app.core import Conversation
|
||||
|
||||
class Message(Msg):
|
||||
def __init__(self, message: Msg) -> None:
|
||||
super().__dict__.update(message.__dict__)
|
||||
args = vars(message)
|
||||
args["client"] = args.pop("_client")
|
||||
super().__init__(**args)
|
||||
|
||||
@cached_property
|
||||
def cmd(self) -> str | None:
|
||||
@@ -65,6 +67,10 @@ class Message(Msg):
|
||||
def text_list(self) -> list:
|
||||
return self.text.split()
|
||||
|
||||
@cached_property
|
||||
def trigger(self):
|
||||
return Config.CMD_TRIGGER if self.is_from_owner else Config.SUDO_TRIGGER
|
||||
|
||||
async def async_deleter(self, del_in, task, block) -> None:
|
||||
if block:
|
||||
x = await task
|
||||
|
||||
@@ -98,6 +98,23 @@ async def ban_or_unban(bot: bot, message: Message) -> None:
|
||||
await message.reply(text=e, del_in=10)
|
||||
|
||||
|
||||
@bot.add_cmd(cmd="kick")
|
||||
async def kick_user(bot, message: Message):
|
||||
user, reason = await message.extract_user_n_reason()
|
||||
if not isinstance(user, User):
|
||||
await message.reply(user, del_in=10)
|
||||
return
|
||||
try:
|
||||
await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id)
|
||||
await asyncio.sleep(1)
|
||||
await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id)
|
||||
await message.reply(
|
||||
text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}."
|
||||
)
|
||||
except Exception as e:
|
||||
await message.reply(text=e, del_in=10)
|
||||
|
||||
|
||||
@bot.add_cmd(cmd=["mute", "unmute"])
|
||||
async def mute_or_unmute(bot: bot, message: Message):
|
||||
user, reason = await message.extract_user_n_reason()
|
||||
|
||||
@@ -82,12 +82,12 @@ async def executor(bot: bot, message: Message) -> Message | None:
|
||||
sys.stdout = sys.__stdout__
|
||||
sys.stderr = sys.__stderr__
|
||||
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
|
||||
output = f"{output}\n\n{func_out}".strip()
|
||||
if func_out is not None:
|
||||
output = "\n\n".join([output, str(func_out)]).strip()
|
||||
if "-s" not in message.flags:
|
||||
output = f"> `{code}`\n\n>> `{output}`"
|
||||
if "-s" in message.flags:
|
||||
output = f">> `{output}`"
|
||||
else:
|
||||
return await reply.delete()
|
||||
output = f"> `{code}`\n\n>> `{output}`"
|
||||
await reply.edit(
|
||||
output,
|
||||
name="exec.txt",
|
||||
|
||||
@@ -8,6 +8,7 @@ from pyrogram.types import Chat, User
|
||||
from app import DB, Config, bot
|
||||
from app.core import Message
|
||||
from app.utils.db_utils import add_data, delete_data
|
||||
from app.utils.helpers import get_name
|
||||
|
||||
FED_LIST: AgnosticCollection = DB.FED_LIST
|
||||
|
||||
@@ -19,6 +20,7 @@ FBAN_REGEX: filters.Filter = filters.regex(
|
||||
r"(New FedBan|starting a federation ban|Starting a federation ban|start a federation ban|FedBan Reason update|FedBan reason updated|Would you like to update this reason)"
|
||||
)
|
||||
|
||||
|
||||
UNFBAN_REGEX: filters.Filter = filters.regex(r"(New un-FedBan|I'll give|Un-FedBan)")
|
||||
|
||||
|
||||
@@ -89,20 +91,20 @@ async def fed_ban(bot: bot, message: Message):
|
||||
await message.reply("Reply to a proof")
|
||||
return
|
||||
proof = await message.replied.forward(Config.FBAN_LOG_CHANNEL)
|
||||
proof_str = f"{ {proof.link} }"
|
||||
proof_str = f"\n{ {proof.link} }"
|
||||
|
||||
await progress.edit("❯❯")
|
||||
total: int = 0
|
||||
failed: list[str] = []
|
||||
reason = f"{reason}\n{proof_str}"
|
||||
fban_cmd: str = f"/fban {user.mention} {reason}"
|
||||
reason = f"{reason}{proof_str}"
|
||||
fban_cmd: str = f"/fban <a href='tg://user?id={user.id}'>{user.id}</a> {reason}"
|
||||
async for fed in FED_LIST.find():
|
||||
chat_id = int(fed["_id"])
|
||||
total += 1
|
||||
cmd: Message = await bot.send_message(
|
||||
chat_id=chat_id, text=fban_cmd, disable_web_page_preview=True
|
||||
)
|
||||
response: Message | None = await cmd.get_response(filters=(FILTERS), timeout=8)
|
||||
response: Message | None = await cmd.get_response(filters=FILTERS, timeout=8)
|
||||
if not response or not (await FBAN_REGEX(bot, response)):
|
||||
failed.append(fed["name"])
|
||||
elif "Would you like to update this reason" in response.text:
|
||||
@@ -111,11 +113,15 @@ async def fed_ban(bot: bot, message: Message):
|
||||
if not total:
|
||||
await progress.edit("You Don't have any feds connected!")
|
||||
return
|
||||
resp_str = f"❯❯❯ <b>FBanned {user.mention}\nID: {user.id}\nReason: {reason}\n"
|
||||
resp_str = f"❯❯❯ <b>FBanned</b> {user.mention}\n<b>ID</b>: {user.id}\n<b>Reason</b>: {reason}\n<b.Initiated in</b>:{message.chat.title}"
|
||||
if failed:
|
||||
resp_str += f"Failed in: {len(failed)}/{total}\n• " + "\n• ".join(failed)
|
||||
resp_str += f"\n<b>Failed</b> in: {len(failed)}/{total}\n• " + "\n• ".join(
|
||||
failed
|
||||
)
|
||||
else:
|
||||
resp_str += f"Success! Fbanned in {total} feds."
|
||||
resp_str += f"\nSuccess! <b>Fbanned</b> in <b>{total}</b> feds."
|
||||
if not message.is_from_owner:
|
||||
resp_str += f"\n<b>By</b>: {get_name(message.from_user)}"
|
||||
await bot.send_message(
|
||||
chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
|
||||
)
|
||||
@@ -137,14 +143,14 @@ async def un_fban(bot: bot, message: Message):
|
||||
await progress.edit("❯❯")
|
||||
total: int = 0
|
||||
failed: list[str] = []
|
||||
unfban_cmd: str = f"/unfban {user.mention} {reason}"
|
||||
unfban_cmd: str = f"/unfban <a href='tg://user?id={user.id}'>{user.id}</a> {reason}"
|
||||
async for fed in FED_LIST.find():
|
||||
chat_id = int(fed["_id"])
|
||||
total += 1
|
||||
cmd: Message = await bot.send_message(
|
||||
chat_id=chat_id, text=unfban_cmd, disable_web_page_preview=True
|
||||
)
|
||||
response: Message | None = await cmd.get_response(filters=(FILTERS), timeout=8)
|
||||
response: Message | None = await cmd.get_response(filters=FILTERS, timeout=8)
|
||||
if not response or not (await UNFBAN_REGEX(bot, response)):
|
||||
failed.append(fed["name"])
|
||||
await asyncio.sleep(0.8)
|
||||
|
||||
@@ -31,7 +31,10 @@ async def add_sudo(bot: bot, message: Message) -> Message | None:
|
||||
await response.edit(text=f"{get_name(user)} already in Sudo!", del_in=5)
|
||||
return
|
||||
Config.SUDO_USERS.append(user.id)
|
||||
await add_data(collection=DB.SUDO_USERS, id=user.id, data=extract_user_data(user))
|
||||
if "-temp" not in message.flags:
|
||||
await add_data(
|
||||
collection=DB.SUDO_USERS, id=user.id, data=extract_user_data(user)
|
||||
)
|
||||
await response.edit(text=f"{user.mention} added to Sudo List.", del_in=5)
|
||||
await bot.log(text=f"{user.mention} added to the Sudo List.")
|
||||
|
||||
@@ -50,6 +53,7 @@ async def remove_sudo(bot: bot, message: Message) -> Message | None:
|
||||
await response.edit(text=f"{get_name(user)} not in Sudo!", del_in=5)
|
||||
return
|
||||
Config.SUDO_USERS.remove(user.id)
|
||||
if "-temp" not in message.flags:
|
||||
await delete_data(collection=DB.SUDO_USERS, id=user.id)
|
||||
await response.edit(text=f"{user.mention} removed from Sudo List.", del_in=5)
|
||||
await bot.log(text=f"{user.mention} removed from Sudo List.")
|
||||
|
||||
@@ -68,7 +68,7 @@ async def leave_chat(bot: bot, message: Message) -> None:
|
||||
else:
|
||||
chat = message.chat.id
|
||||
await message.reply(
|
||||
f"Leaving current chat in 5\nReply with `{Config.TRIGGER}c` to cancel",
|
||||
f"Leaving current chat in 5\nReply with `{message.trigger}c` to cancel",
|
||||
del_in=5,
|
||||
block=True,
|
||||
)
|
||||
|
||||
@@ -12,15 +12,15 @@ from app.core import Message
|
||||
@bot.add_cmd(cmd="help")
|
||||
async def cmd_list(bot: bot, message: Message) -> None:
|
||||
commands: str = "\n".join(
|
||||
[f"<code>{Config.TRIGGER}{i}</code>" for i in Config.CMD_DICT.keys()]
|
||||
[f"<code>{message.trigger}{cmd}</code>" for cmd in Config.CMD_DICT.keys()]
|
||||
)
|
||||
await message.reply(
|
||||
f"<b>Available Commands:</b>\n\n{commands}", del_in=30, block=False
|
||||
f"<b>Available Commands:</b>\n\n{commands}", del_in=30, block=True
|
||||
)
|
||||
|
||||
|
||||
# Not my Code
|
||||
# Prolly from Userge/UX/VenomX idk
|
||||
# Prolly from Userge/UX/VenomX IDK
|
||||
@bot.add_cmd(cmd="ping")
|
||||
async def ping_bot(bot: bot, message: Message):
|
||||
start = datetime.now()
|
||||
|
||||
Reference in New Issue
Block a user