Fbans: Unfban, Sudo Support

This commit is contained in:
thedragonsinn
2023-10-09 15:32:14 +05:30
parent c7e935c15c
commit e8aa26d4f2
11 changed files with 251 additions and 71 deletions

View File

@@ -1,4 +1,3 @@
import json
import os
from typing import Callable
@@ -7,7 +6,9 @@ from pyrogram.types import Message
class Config:
CMD_DICT: dict["str", Callable] = {}
CMD_DICT: dict[str, Callable] = {}
CMD_TRIGGER: str = os.environ.get("CMD_TRIGGER", ".")
CONVO_DICT: dict[int, dict[str | int, Message | Filter | None]] = {}
@@ -21,11 +22,15 @@ class Config:
LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
TRIGGER: str = os.environ.get("TRIGGER", ".")
SUDO: bool = False
SUDO_TRIGGER: str = os.environ.get("SUDO_TRIGGER", "!")
OWNER_ID = int(os.environ.get("OWNER_ID"))
USERS: list[int] = json.loads(os.environ.get("USERS", "[]"))
SUDO_CMD_LIST: list[str] = []
SUDO_USERS: list[int] = []
UPSTREAM_REPO: str = os.environ.get(
"UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub"

View File

@@ -1,3 +1,4 @@
import asyncio
import glob
import importlib
import os
@@ -15,7 +16,7 @@ from app.core import Conversation, Message
from app.utils import aiohttp_tools, helpers
async def import_modules():
def import_modules():
for py_module in glob.glob(pathname="app/**/*.py", recursive=True):
name = os.path.splitext(py_module)[0]
py_name = name.replace("/", ".")
@@ -25,6 +26,24 @@ async def import_modules():
print(e)
async def init_tasks():
sudo = await DB.SUDO.find_one({"_id": "sudo_switch"})
if sudo:
Config.SUDO = sudo["value"]
Config.SUDO_USERS = [sudo_user["_id"] async for sudo_user in DB.SUDO_USERS.find()]
Config.SUDO_CMD_LIST = [
sudo_cmd["_id"] async for sudo_cmd in DB.SUDO_CMD_LIST.find()
]
helpers.TELEGRAPH = Telegraph()
await helpers.TELEGRAPH.create_account(
short_name="Plain-UB", author_name="Plain-UB", author_url=Config.UPSTREAM_REPO
)
import_modules()
await aiohttp_tools.session_switch()
class BOT(Client):
def __init__(self):
super().__init__(
@@ -70,17 +89,10 @@ class BOT(Client):
async def boot(self) -> None:
await super().start()
await import_modules()
await aiohttp_tools.session_switch()
await self.edit_restart_msg()
helpers.TELEGRAPH = Telegraph()
await helpers.TELEGRAPH.create_account(
short_name="Plain-UB",
author_name="Plain-UB",
author_url=Config.UPSTREAM_REPO,
)
print("started")
await self.log(text="<i>Started</i>")
await asyncio.gather(
init_tasks(), self.edit_restart_msg(), self.log(text="<i>Started</i>")
)
await idle()
await aiohttp_tools.session_switch()
DB._client.close()

View File

@@ -1,32 +1,49 @@
from pyrogram import filters as _filters
from pyrogram.types import Message
from app import Config
def dynamic_cmd_filter(_, __, message) -> bool:
def cmd_check(message: Message, trigger: str, sudo: bool = False) -> bool:
start_str = message.text.split(maxsplit=1)[0]
cmd = start_str.replace(trigger, "", 1)
if sudo and cmd not in Config.SUDO_CMD_LIST:
return False
return bool(cmd in Config.CMD_DICT)
def owner_check(filter, client, message: Message) -> bool:
if (
not message.text
or not message.text.startswith(Config.TRIGGER)
message.reactions
or not message.text
or not message.text.startswith(Config.CMD_TRIGGER)
or not message.from_user
or (
message.from_user.id != Config.OWNER_ID
and message.from_user.id not in Config.USERS
)
or (
message.from_user.id == Config.OWNER_ID
and message.chat.id != Config.OWNER_ID
and not message.outgoing
)
or message.from_user.id != Config.OWNER_ID
or (message.chat.id != Config.OWNER_ID and not message.outgoing)
):
return False
start_str = message.text.split(maxsplit=1)[0]
cmd = start_str.replace(Config.TRIGGER, "", 1)
cmd_check = cmd in Config.CMD_DICT
reaction_check = not message.reactions
return bool(cmd_check and reaction_check)
cmd = cmd_check(message, Config.CMD_TRIGGER)
return cmd
cmd_filter = _filters.create(dynamic_cmd_filter)
def sudo_check(filter, client, message: Message) -> bool:
if (
not Config.SUDO
or message.reactions
or not message.text
or not message.text.startswith(Config.SUDO_TRIGGER)
or not message.from_user
or message.from_user.id not in Config.SUDO_USERS
):
return False
cmd = cmd_check(message, Config.SUDO_TRIGGER, sudo=True)
return cmd
owner_filter = _filters.create(owner_check)
sudo_filter = _filters.create(sudo_check)
convo_filter = _filters.create(
lambda _, __, message: (message.chat.id in Config.CONVO_DICT)
and (not message.reactions)

View File

@@ -7,14 +7,14 @@ from app import Config, bot
from app.core import Message, filters
@bot.on_message(filters.cmd_filter, group=1)
@bot.on_edited_message(filters.cmd_filter, group=1)
@bot.on_message(filters.owner_filter | filters.sudo_filter, group=1)
@bot.on_edited_message(filters.owner_filter | filters.sudo_filter, group=1)
async def cmd_dispatcher(bot, message) -> None:
message = Message.parse_message(message)
func = Config.CMD_DICT[message.cmd]
coro = func(bot, message)
await run_coro(coro, message)
if message.is_from_owner:
x = await run_coro(coro, message)
if not x and message.is_from_owner:
await message.delete()
@@ -33,7 +33,7 @@ async def convo_handler(bot: bot, message: Msg):
message.continue_propagation()
async def run_coro(coro, message) -> None:
async def run_coro(coro, message) -> None | int:
try:
task = asyncio.Task(coro, name=message.task_id)
await task
@@ -46,3 +46,4 @@ async def run_coro(coro, message) -> None:
func=coro.__name__,
name="traceback.txt",
)
return 1

View File

@@ -12,7 +12,10 @@ class DataBase:
def __init__(self):
self._client: AgnosticClient = AsyncIOMotorClient(Config.DB_URL)
self.db: AgnosticDatabase = self._client["plain_ub"]
self.SUDO_USERS: AgnosticCollection = self.db.USERS
self.FED_LIST: AgnosticCollection = self.db.FED_LIST
self.SUDO: AgnosticCollection = self.db.SUDO
self.SUDO_USERS: AgnosticCollection = self.db.SUDO_USERS
self.SUDO_CMD_LIST: AgnosticCollection = self.db.SUDO_CMD_LIST
def __getattr__(self, attr) -> AgnosticCollection:
try:
@@ -22,4 +25,4 @@ class DataBase:
return self.__dict__[attr]
DB = DataBase()
DB: DataBase = DataBase()

View File

@@ -26,12 +26,11 @@ class Message(Msg):
@cached_property
def flt_input(self) -> str:
split_lines = self.input.splitlines()
split_n_joined = [
" ".join([word for word in line.split(" ") if word not in self.flags])
for line in split_lines
]
return "\n".join(split_n_joined)
split_lines = self.input.split("\n", maxsplit=1)
split_lines[0] = " ".join(
[word for word in split_lines[0].split(" ") if word not in self.flags]
)
return "\n".join(split_lines)
@cached_property
def input(self) -> str:

View File

@@ -9,16 +9,23 @@ from app import DB, Config, bot
from app.core import Message
from app.utils.db_utils import add_data, delete_data
FEDS: AgnosticCollection = DB.FED_LIST
FILTERS: filters.Filter = filters.user([609517172, 2059887769])
FED_LIST: AgnosticCollection = DB.FED_LIST
FILTERS: filters.Filter = (
filters.user([609517172, 2059887769]) & ~filters.service
) # NOQA
FBAN_REGEX: filters.Filter = filters.regex(
r"(New FedBan|starting a federation ban|Starting a federation ban|start a federation ban|FedBan Reason update|FedBan reason updated|Would you like to update this reason)"
)
UNFBAN_REGEX: filters.Filter = filters.regex(r"(New un-FedBan|I'll give|Un-FedBan)")
class _User(User):
def __init__(self, id):
super().__init__(id=id)
self.first_name = id
@cached_property
def mention(self) -> str:
@@ -28,7 +35,7 @@ class _User(User):
@bot.add_cmd(cmd="addf")
async def add_fed(bot: bot, message: Message):
data = dict(name=message.input or message.chat.title, type=str(message.chat.type))
await add_data(collection=FEDS, id=message.chat.id, data=data)
await add_data(collection=FED_LIST, id=message.chat.id, data=data)
await message.reply(
f"<b>{data['name']}</b> added to FED LIST.", del_in=5, block=False
)
@@ -40,7 +47,7 @@ async def add_fed(bot: bot, message: Message):
@bot.add_cmd(cmd="delf")
async def remove_fed(bot: bot, message: Message):
if "-all" in message.flags:
await FEDS.drop()
await FED_LIST.drop()
await message.reply("FED LIST cleared.")
return
chat: int | str | Chat = message.input or message.chat
@@ -50,7 +57,7 @@ async def remove_fed(bot: bot, message: Message):
chat = chat.id
elif chat.lstrip("-").isdigit():
chat = int(chat)
deleted: bool | None = await delete_data(collection=FEDS, id=chat)
deleted: bool | None = await delete_data(collection=FED_LIST, id=chat)
if deleted:
await message.reply(
f"<b>{name}</b><code>{chat}</code> removed from FED LIST.",
@@ -73,7 +80,7 @@ async def fed_ban(bot: bot, message: Message):
return
if not isinstance(user, User):
user = _User(id=message.text_list[1])
if user.id in Config.USERS:
if user.id in [Config.OWNER_ID, *Config.SUDO_USERS]:
await progress.edit("Cannot Fban Owner/Sudo users.")
return
proof_str: str = ""
@@ -82,14 +89,14 @@ async def fed_ban(bot: bot, message: Message):
await message.reply("Reply to a proof")
return
proof = await message.replied.forward(Config.FBAN_LOG_CHANNEL)
proof_str = "".join(["{ ", proof.link, " }"])
proof_str = f"{ {proof.link} }"
await progress.edit("")
total: int = 0
failed: list[str] = []
reason = f"{reason}\n{proof_str}"
fban_cmd: str = f"/fban {user.mention} {reason}"
async for fed in FEDS.find():
async for fed in FED_LIST.find():
chat_id = int(fed["_id"])
total += 1
cmd: Message = await bot.send_message(
@@ -109,7 +116,9 @@ async def fed_ban(bot: bot, message: Message):
resp_str += f"Failed in: {len(failed)}/{total}\n" + "\n".join(failed)
else:
resp_str += f"Success! Fbanned in {total} feds."
await bot.send_message(chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True)
await bot.send_message(
chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
)
await progress.edit(
text=resp_str, del_in=8, block=False, disable_web_page_preview=True
)
@@ -117,14 +126,49 @@ async def fed_ban(bot: bot, message: Message):
@bot.add_cmd(cmd="unfban")
async def un_fban(bot: bot, message: Message):
...
progress: Message = await message.reply("")
user, reason = await message.extract_user_n_reason()
if isinstance(user, str):
await progress.edit(user)
return
if not isinstance(user, User):
user = _User(id=message.text_list[1])
await progress.edit("")
total: int = 0
failed: list[str] = []
unfban_cmd: str = f"/unfban {user.mention} {reason}"
async for fed in FED_LIST.find():
chat_id = int(fed["_id"])
total += 1
cmd: Message = await bot.send_message(
chat_id=chat_id, text=unfban_cmd, disable_web_page_preview=True
)
response: Message | None = await cmd.get_response(filters=(FILTERS), timeout=8)
if not response or not (await UNFBAN_REGEX(bot, response)):
failed.append(fed["name"])
await asyncio.sleep(0.8)
if not total:
await progress.edit("You Don't have any feds connected!")
return
resp_str = f" <b>Un-FBanned {user.mention}\nID: {user.id}\nReason: {reason}\n"
if failed:
resp_str += f"Failed in: {len(failed)}/{total}\n" + "\n".join(failed)
else:
resp_str += f"Success! Un-Fbanned in {total} feds."
await bot.send_message(
chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
)
await progress.edit(
text=resp_str, del_in=8, block=False, disable_web_page_preview=True
)
@bot.add_cmd(cmd="listf")
async def fed_list(bot: bot, message: Message):
output: str = ""
total = 0
async for fed in DB.FED_LIST.find():
async for fed in FED_LIST.find():
output += f'<b>• {fed["name"]}</b>\n'
if "-id" in message.flags:
output += f' <code>{fed["_id"]}</code>\n'

102
app/plugins/sudo.py Normal file
View File

@@ -0,0 +1,102 @@
from pyrogram.types import User
from app import DB, Config, bot
from app.core import Message
from app.plugins.fbans import _User
from app.utils.db_utils import add_data, delete_data
from app.utils.helpers import extract_user_data, get_name
@bot.add_cmd(cmd="sudo")
async def sudo(bot: bot, message: Message):
if "-c" in message.flags:
await message.reply(text=f"Sudo is enabled: <b>{Config.SUDO}</b> .", del_in=8)
return
value = not Config.SUDO
Config.SUDO = value
await add_data(collection=DB.SUDO, id="sudo_switch", data={"value": value})
await message.reply(text=f"Sudo is enabled: <b>{value}</b>!", del_in=8)
@bot.add_cmd(cmd="addsudo")
async def add_sudo(bot: bot, message: Message) -> Message | None:
response = await message.reply("Extracting User info...")
user, _ = await message.extract_user_n_reason()
if isinstance(user, str):
await response.edit(user)
return
if not isinstance(user, User):
user: _User = _User(id=message.text_list[1])
if user.id in Config.SUDO_USERS:
await response.edit(text=f"{get_name(user)} already in Sudo!", del_in=5)
return
Config.SUDO_USERS.append(user.id)
await add_data(collection=DB.SUDO_USERS, id=user.id, data=extract_user_data(user))
await response.edit(text=f"{user.mention} added to Sudo List.", del_in=5)
@bot.add_cmd(cmd="delsudo")
async def remove_sudo(bot: bot, message: Message) -> Message | None:
response = await message.reply("Extracting User info...")
user, _ = await message.extract_user_n_reason()
if isinstance(user, str):
await response.edit(user)
return
if not isinstance(user, User):
user: _User = _User(id=message.text_list[1])
if user.id not in Config.SUDO_USERS:
await response.edit(text=f"{get_name(user)} not in Sudo!", del_in=5)
return
Config.SUDO_USERS.remove(user.id)
await delete_data(collection=DB.SUDO_USERS, id=user.id)
await response.edit(text=f"{user.mention} removed from Sudo List.", del_in=5)
@bot.add_cmd(cmd="vsudo")
async def sudo_list(bot: bot, message: Message):
output: str = ""
total = 0
async for user in DB.SUDO_USERS.find():
output += f'<b>• {user["name"]}</b>\n'
if "-id" in message.flags:
output += f' <code>{user["_id"]}</code>\n'
total += 1
if not total:
await message.reply("You don't have any SUDO USERS.")
return
output: str = f"List of <b>{total}</b> SUDO USERS:\n\n{output}"
await message.reply(output, del_in=30, block=False)
@bot.add_cmd(cmd="addscmd")
async def add_scmd(bot: bot, message: Message):
cmd = message.flt_input
response = await message.reply(f"Adding <b>{cmd}</b> to sudo....")
if cmd in Config.SUDO_CMD_LIST:
await response.edit(f"<b>{cmd}</b> already in Sudo!")
return
await DB.SUDO_CMD_LIST.insert_one({"_id": cmd})
await response.edit(f"<b>{cmd}</b> added to Sudo!")
@bot.add_cmd(cmd="delscmd")
async def del_scmd(bot: bot, message: Message):
cmd = message.flt_input
response = await message.reply(f"Removing <b>{cmd}</b> from sudo....")
if cmd not in Config.SUDO_CMD_LIST:
await response.edit(f"<b>{cmd}</b> not in Sudo!")
return
await DB.SUDO_CMD_LIST.delete_one({"_id": cmd})
await response.edit(f"<b>{cmd}</b> added to Sudo!")
@bot.add_cmd(cmd="vscmd")
async def view_sudo_cmd(bot: bot, message: Message):
cmds = " ".join(Config.SUDO_CMD_LIST)
if not cmds:
await message.reply("No Commands in SUDO!")
return
await message.reply(
f"List of <b>{len(cmds)}</b> SUDO CMDS:\n\n{cmds}", del_in=30, block=False
)

View File

@@ -1,12 +1,7 @@
def extract_user_data(user) -> dict:
return dict(
name=f"""{user.first_name or ""} {user.last_name or ""}""",
username=user.username,
mention=user.mention,
)
from motor.core import AgnosticCollection
async def add_data(collection, id: int | str, data: dict) -> None:
async def add_data(collection: AgnosticCollection, id: int | str, data: dict) -> None:
found = await collection.find_one({"_id": id})
if not found:
await collection.insert_one({"_id": id, **data})
@@ -14,7 +9,7 @@ async def add_data(collection, id: int | str, data: dict) -> None:
await collection.update_one({"_id": id}, {"$set": data})
async def delete_data(collection, id: int | str) -> bool | None:
async def delete_data(collection: AgnosticCollection, id: int | str) -> bool | None:
found = await collection.find_one({"_id": id})
if found:
await collection.delete_one({"_id": id})

View File

@@ -1,8 +1,9 @@
from pyrogram.types import User
from telegraph.aio import Telegraph
from app import Config
TELEGRAPH = None
TELEGRAPH: None | Telegraph = None
async def post_to_telegraph(title: str, text: str):
@@ -19,3 +20,7 @@ def get_name(user: User) -> str:
first = user.first_name or ""
last = user.last_name or ""
return f"{first} {last}".strip()
def extract_user_data(user: User) -> dict:
return dict(name=get_name(user), username=user.username, mention=user.mention)

View File

@@ -3,6 +3,7 @@ API_ID=
API_HASH=
CMD_TRIGGER=.
DEV_MODE=0
# exec, sh, shell commands
@@ -28,12 +29,8 @@ SESSION_STRING=""
# Your string session
TRIGGER=.
SUDO_TRIGGER=!
# Trigger for bot
USERS=[1223478, 987654321]
# Optional Sudo access.... Separate multiple values with ,
UPSTREAM_REPO="https://github.com/thedragonsinn/plain-ub"