WIP:Fbans.

This commit is contained in:
thedragonsinn
2023-10-03 16:27:21 +05:30
parent af1eb94351
commit 245234a44b
8 changed files with 150 additions and 12 deletions

View File

@@ -14,5 +14,3 @@ if "com.termux" not in os.environ.get("PATH", ""):
uvloop.install()
bot = BOT()
from app.core.client.conversation import Conversation as Convo # NOQA

View File

@@ -15,6 +15,10 @@ class Config:
DB_URL: str = os.environ.get("DB_URL")
FBAN_LOG_CHANNEL: int = int(
os.environ.get("FBAN_LOG_CHANNEL", os.environ.get("LOG_CHAT"))
)
LOG_CHAT: int = int(os.environ.get("LOG_CHAT"))
TRIGGER: str = os.environ.get("TRIGGER", ".")

View File

@@ -1,3 +1,5 @@
from app.core.client import filters
from app.core.types.callback_query import CallbackQuery
from app.core.types.message import Message
from app.core.client.conversation import Conversation # NOQA
from app.core.types.message import Message # NOQA

View File

@@ -5,12 +5,12 @@ import sys
from functools import wraps
from io import BytesIO
from pyrogram import Client, idle
from pyrogram import Client, filters, idle
from pyrogram.enums import ParseMode
from pyrogram.types import Message as Msg
from app import DB, Config
from app.core import Message
from app.core import Conversation, Message
from app.utils import aiohttp_tools
@@ -38,13 +38,11 @@ class BOT(Client):
)
@staticmethod
def add_cmd(cmd: str, cb: bool = False):
def add_cmd(cmd: str):
def the_decorator(func):
@wraps(func)
def wrapper():
config_dict = Config.CMD_DICT
if cb:
config_dict = Config.CALLBACK_DICT
if isinstance(cmd, list):
for _cmd in cmd:
config_dict[_cmd] = func
@@ -56,6 +54,19 @@ class BOT(Client):
return the_decorator
@staticmethod
async def get_response(
chat_id: int, filters: filters.Filter = None, timeout: int = 8
) -> Message | None:
try:
async with Conversation(
chat_id=chat_id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
return
async def boot(self) -> None:
await super().start()
await import_modules()
@@ -90,7 +101,7 @@ class BOT(Client):
parse_mode=ParseMode.HTML,
) -> Message | Msg:
if message:
return (await message.copy(chat_id=Config.LOG_CHAT))
return await message.copy(chat_id=Config.LOG_CHAT)
if traceback:
text = f"""
#Traceback

View File

@@ -4,7 +4,7 @@ import json
from pyrogram.filters import Filter
from pyrogram.types import Message
from app import Config, bot
from app import Config
class Conversation:
@@ -20,7 +20,6 @@ class Conversation:
super().__init__("Conversation Timeout")
def __init__(self, chat_id: int, filters: Filter | None = None, timeout: int = 10):
self._client = bot
self.chat_id = chat_id
self.filters = filters
self.timeout = timeout

View File

@@ -2,10 +2,12 @@ import asyncio
from functools import cached_property
from pyrogram.errors import MessageDeleteForbidden
from pyrogram.filters import Filter
from pyrogram.types import Message as Msg
from pyrogram.types import User
from app import Config
from app.core import Conversation
class Message(Msg):
@@ -112,6 +114,16 @@ class Message(Msg):
except Exception as e:
return [e, reason]
async def get_response(self, filters: Filter = None, timeout: int = 8):
try:
async with Conversation(
chat_id=self.chat.id, filters=filters, timeout=timeout
) as convo:
response: Message | None = await convo.get_response()
return response
except Conversation.TimeOutError:
return
async def reply(
self, text, del_in: int = 0, block: bool = True, **kwargs
) -> "Message":

109
app/plugins/fbans.py Normal file
View File

@@ -0,0 +1,109 @@
import asyncio
from functools import cached_property
from motor.core import AgnosticCollection
from pyrogram import filters
from pyrogram.types import Chat, User
from app import DB, Config, bot
from app.core import Message
from app.utils.db_utils import add_data, delete_data
FEDS: AgnosticCollection = DB.FED_LIST
FILTERS: filters.Filter = filters.user([609517172, 2059887769])
FBAN_REGEX: filters.Filter = filters.regex(
r"(New FedBan|starting a federation ban|Starting a federation ban|start a federation ban|FedBan Reason update|FedBan reason updated|Would you like to update this reason)"
)
class _User(User):
def __init__(self, id):
super().__init__(id=id)
@cached_property
def mention(self) -> str:
return f"<a href='tg://user?id={self.id}'>{self.id}</a>"
@bot.add_cmd(cmd="addf")
async def add_fed(bot: bot, message: Message):
data = dict(
name=message.flt_input or message.chat.title, type=str(message.chat.type)
)
await add_data(collection=FEDS, id=message.chat.id, data=data)
await message.reply(f"<b>{data['name']}</b> added to FED LIST.", del_in=5, block=False)
await bot.log(text=f"#FBANS\n<b>{data['name']}</b> <code>{message.chat.id}</code> added to FED LIST.")
@bot.add_cmd(cmd="delf")
async def remove_fed(bot: bot, message: Message):
if "-all" in message.flags:
await FEDS.drop()
await message.reply("FED LIST cleared.")
return
chat: int | str | Chat = message.flt_input or message.chat
name = ""
if isinstance(chat, Chat):
name = f"Chat: {chat.title}\n"
chat = chat.id
elif chat.isdigit():
chat = int(chat)
deleted: bool | None = await delete_data(collection=FEDS, id=chat)
if deleted:
await message.reply(
f"<b>{name}</b><code>{chat}</code> removed from FED LIST.", del_in=8, block=False
)
await bot.log(text=f"#FBANS\n<b>{name}</b><code>{chat}</code> removed from FED LIST.")
else:
await message.reply(f"<b>{name or chat}</b> not in FED LIST.", del_in=8)
@bot.add_cmd(cmd="fban")
async def fed_ban(bot: bot, message: Message):
await message.delete()
progress: Message = await message.reply("")
user, reason = await message.extract_user_n_reason()
if isinstance(user, str):
await progress.edit(user)
return
if not isinstance(user, User):
user = _User(id=message.text_list[1])
if user.id in Config.USERS:
await progress.edit("Cannot Fban Owner/Sudo users.")
return
await progress.edit("")
total: int = 0
failed: list[str] = []
async for fed in FEDS.find():
chat_id = int(fed["_id"])
total += 1
cmd: Message = await bot.send_message(
chat_id=chat_id,
text=f"!fban {user.mention} {reason}",
disable_web_page_preview=True,
)
response: Message | None = await cmd.get_response(
filters=(FILTERS), timeout=8
)
if not response or not (await FBAN_REGEX(bot, response)):
failed.append(fed["name"])
elif "Would you like to update this reason" in response.text:
await response.click("Update reason")
await asyncio.sleep(0.8)
if not total:
await progress.edit("You Don't have any feds connected!")
return
resp_str = f" <b>FBanned {user.mention}\nID: {user.id}\nReason: {reason}\n"
if failed:
resp_str += f"Failed in: {len(failed)}/{total}\n" + "\n".join(failed)
else:
resp_str += f"Success! Fbanned in {total} feds."
await progress.edit(
text=resp_str, del_in=8, block=False, disable_web_page_preview=True
)
await bot.send_message(chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str)
@bot.add_cmd(cmd="unfban")
async def un_fban(bot: bot, message: Message):
...

View File

@@ -8,6 +8,9 @@ DEV_MODE=0
DB_URL=
# Mongo DB cluster URL
FBAN_LOG_CHANNEL=
# FedBan Proof and logs.
LOG_CHAT=
# Bot logs chat
@@ -16,4 +19,4 @@ SESSION_STRING=""
USERS = [1223478]
# Separate multiple values with ,
UPSTREAM_REPO = "https://github.com/thedragonsinn/plain-ub"
UPSTREAM_REPO = "https://github.com/thedragonsinn/plain-ub"