code formatting

This commit is contained in:
thedragonsinn
2025-03-18 17:40:12 +05:30
parent 7903385588
commit 9ae2b69762
19 changed files with 62 additions and 193 deletions

View File

@@ -1,9 +1 @@
from ub_core import (
BOT,
LOGGER,
Config,
Convo,
CustomDB,
Message,
bot,
)
from ub_core import BOT, LOGGER, Config, Convo, CustomDB, Message, bot

View File

@@ -1,8 +1,6 @@
from os import environ
ALIVE_MEDIA: str = environ.get(
"ALIVE_MEDIA", "https://telegra.ph/file/a1d35a86c7f54a96188a9.png"
)
ALIVE_MEDIA: str = environ.get("ALIVE_MEDIA", "https://telegra.ph/file/a1d35a86c7f54a96188a9.png")
BOT_NAME = environ.get("BOT_NAME", "PLAIN-UB")
@@ -18,9 +16,7 @@ GEMINI_API_KEY: str = environ.get("GEMINI_API_KEY")
LOAD_HANDLERS: bool = True
MESSAGE_LOGGER_CHAT: int = int(
environ.get("MESSAGE_LOGGER_CHAT", environ.get("LOG_CHAT"))
)
MESSAGE_LOGGER_CHAT: int = int(environ.get("MESSAGE_LOGGER_CHAT", environ.get("LOG_CHAT")))
PM_GUARD: bool = False
@@ -28,6 +24,4 @@ PM_LOGGER: bool = False
TAG_LOGGER: bool = False
UPSTREAM_REPO: str = environ.get(
"UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub"
)
UPSTREAM_REPO: str = environ.get("UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub")

View File

@@ -132,10 +132,7 @@ async def fed_ban(bot: BOT, message: Message):
me = await bot.get_chat_member(chat_id=message.chat.id, user_id="me")
if me.status in {ChatMemberStatus.OWNER, ChatMemberStatus.ADMINISTRATOR}:
await message.replied.reply(
text=f"!dban {reason}",
disable_preview=True,
del_in=3,
block=False,
text=f"!dban {reason}", disable_preview=True, del_in=3, block=False
)
except UserNotParticipant:
pass
@@ -178,9 +175,7 @@ async def un_fban(bot: BOT, message: Message):
)
async def get_user_reason(
message: Message, progress: Message
) -> tuple[int, str, str] | None:
async def get_user_reason(message: Message, progress: Message) -> tuple[int, str, str] | None:
user, reason = await message.extract_user_n_reason()
if isinstance(user, str):
await progress.edit(user)
@@ -222,9 +217,7 @@ async def _perform_fed_task(
cmd: Message = await bot.send_message(
chat_id=chat_id, text=command, disable_preview=True
)
response: Message | None = await cmd.get_response(
filters=task_filter, timeout=8
)
response: Message | None = await cmd.get_response(filters=task_filter, timeout=8)
if not response:
failed.append(fed["name"])
elif "Would you like to update this reason" in response.text:
@@ -253,9 +246,7 @@ async def _perform_fed_task(
)
if failed:
resp_str += f"\n<b>Failed</b> in: {len(failed)}/{total}\n" + "\n".join(
failed
)
resp_str += f"\n<b>Failed</b> in: {len(failed)}/{total}\n" + "\n".join(failed)
else:
resp_str += f"\n<b>Status</b>: {task_type}ned in <b>{total}</b> feds."
@@ -263,9 +254,7 @@ async def _perform_fed_task(
resp_str += f"\n\n<b>By</b>: {get_name(message.from_user)}"
await bot.send_message(
chat_id=extra_config.FBAN_LOG_CHANNEL,
text=resp_str,
disable_preview=True,
chat_id=extra_config.FBAN_LOG_CHANNEL, text=resp_str, disable_preview=True
)
await progress.edit(text=resp_str, del_in=5, block=True, disable_preview=True)
@@ -279,6 +268,4 @@ async def handle_sudo_fban(command: str):
sudo_cmd = command.replace("/", extra_config.FBAN_SUDO_TRIGGER, 1)
await bot.send_message(
chat_id=extra_config.FBAN_SUDO_ID, text=sudo_cmd, disable_preview=True
)
await bot.send_message(chat_id=extra_config.FBAN_SUDO_ID, text=sudo_cmd, disable_preview=True)

View File

@@ -19,9 +19,7 @@ async def kick_user(bot: BOT, message: Message):
await bot.ban_chat_member(chat_id=message.chat.id, user_id=user.id)
await asyncio.sleep(2)
await bot.unban_chat_member(chat_id=message.chat.id, user_id=user.id)
await message.reply(
text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}"
)
await message.reply(text=f"{message.cmd.capitalize()}ed: {user.mention}\nReason: {reason}")
except Exception as e:
await message.reply(text=e, del_in=10)
@@ -41,9 +39,7 @@ async def kick_inactive_members(bot: BOT, message: Message):
count = 0
chat_id = message.chat.id
async with bot.Convo(
client=bot, chat_id=chat_id, from_user=message.from_user.id
) as convo:
async with bot.Convo(client=bot, chat_id=chat_id, from_user=message.from_user.id) as convo:
async for member in bot.get_chat_members(chat_id):
if member.status in ADMIN_STATUS:
@@ -51,9 +47,7 @@ async def kick_inactive_members(bot: BOT, message: Message):
user = member.user
message_count = await bot.search_messages_count(
chat_id=chat_id, from_user=user.id
)
message_count = await bot.search_messages_count(chat_id=chat_id, from_user=user.id)
if message_count >= 10:
continue

View File

@@ -26,8 +26,6 @@ async def mute_or_unmute(bot: BOT, message: Message):
can_add_web_page_previews=False,
),
)
await message.reply(
text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}"
)
await message.reply(text=f"{message.cmd.capitalize()}d: {user.mention}\nReason: {reason}")
except Exception as e:
await message.reply(text=e, del_in=10)

View File

@@ -32,9 +32,7 @@ async def promote_or_demote(bot: BOT, message: Message) -> None:
PROMOTE: .promote [ -anon | -full ] [ UID | REPLY | @ ] Title[Optional]
DEMOTE: .demote [ UID | REPLY | @ ]
"""
response: Message = await message.reply(
f"Trying to {message.cmd.capitalize()}....."
)
response: Message = await message.reply(f"Trying to {message.cmd.capitalize()}.....")
my_status = await bot.get_chat_member(chat_id=message.chat.id, user_id=bot.me.id)
my_privileges = my_status.privileges
@@ -100,16 +98,12 @@ async def demote_all(bot: BOT, message: Message):
):
try:
await bot.promote_chat_member(
chat_id=message.chat.id,
user_id=member.user.id,
privileges=DEMOTE_PRIVILEGES,
chat_id=message.chat.id, user_id=member.user.id, privileges=DEMOTE_PRIVILEGES
)
except FloodWait as f:
await asyncio.sleep(f.value + 10)
await bot.promote_chat_member(
chat_id=message.chat.id,
user_id=member.user.id,
privileges=DEMOTE_PRIVILEGES,
chat_id=message.chat.id, user_id=member.user.id, privileges=DEMOTE_PRIVILEGES
)
await asyncio.sleep(0.5)
count += 1

View File

@@ -1,7 +1,7 @@
import asyncio
import os
import shutil
import time
from pathlib import Path
from ub_core.utils.downloader import Download, DownloadedFile
@@ -29,17 +29,14 @@ async def rename(bot: BOT, message: Message):
)
return
dl_path = os.path.join("downloads", str(time.time()))
dl_path = Path("downloads") / str(time.time())
await response.edit("Input verified....Starting Download...")
if message.replied:
dl_obj: None = None
download_coro = telegram_download(
message=message.replied,
dir_name=dl_path,
file_name=input,
response=response,
message=message.replied, dir_name=dl_path, file_name=input, response=response
)
else:

View File

@@ -21,9 +21,7 @@ from app import BOT, Config, Message
UPLOAD_TYPES = Union[BOT.send_audio, BOT.send_document, BOT.send_photo, BOT.send_video]
async def video_upload(
bot: BOT, file: DownloadedFile, has_spoiler: bool
) -> UPLOAD_TYPES:
async def video_upload(bot: BOT, file: DownloadedFile, has_spoiler: bool) -> UPLOAD_TYPES:
thumb = await take_ss(file.path, path=file.path)
if not await check_audio(file.path):
return partial(
@@ -43,24 +41,16 @@ async def video_upload(
)
async def photo_upload(
bot: BOT, file: DownloadedFile, has_spoiler: bool
) -> UPLOAD_TYPES:
async def photo_upload(bot: BOT, file: DownloadedFile, has_spoiler: bool) -> UPLOAD_TYPES:
return partial(bot.send_photo, photo=file.path, has_spoiler=has_spoiler)
async def audio_upload(bot: BOT, file: DownloadedFile, *_, **__) -> UPLOAD_TYPES:
return partial(
bot.send_audio,
audio=file.path,
duration=await get_duration(file=file.path),
)
return partial(bot.send_audio, audio=file.path, duration=await get_duration(file=file.path))
async def doc_upload(bot: BOT, file: DownloadedFile, *_, **__) -> UPLOAD_TYPES:
return partial(
bot.send_document, document=file.path, disable_content_type_detection=True
)
return partial(bot.send_document, document=file.path, disable_content_type_detection=True)
FILE_TYPE_MAP = {
@@ -114,14 +104,10 @@ async def upload(bot: BOT, message: Message):
try:
async with Download(
url=input,
dir=os.path.join("downloads", str(time.time())),
message_to_edit=response,
url=input, dir=os.path.join("downloads", str(time.time())), message_to_edit=response
) as dl_obj:
if size_over_limit(dl_obj.size, client=bot):
await response.edit(
"<b>Aborted</b>, File size exceeds TG Limits!!!"
)
await response.edit("<b>Aborted</b>, File size exceeds TG Limits!!!")
return
await response.edit("URL detected in input, Starting Download....")
@@ -178,9 +164,7 @@ async def bulk_upload(message: Message, response: Message):
file_info = DownloadedFile(file=file)
if size_over_limit(file_info.size, client=message._client):
await response.reply(
f"Skipping {file_info.name} due to size exceeding limit."
)
await response.reply(f"Skipping {file_info.name} due to size exceeding limit.")
continue
temp_resp = await response.reply(f"starting to upload `{file_info.name}`")
@@ -197,9 +181,7 @@ async def upload_to_tg(file: DownloadedFile, message: Message, response: Message
if "-d" in message.flags:
upload_method = partial(
message._client.send_document,
document=file.path,
disable_content_type_detection=True,
message._client.send_document, document=file.path, disable_content_type_detection=True
)
else:
upload_method: UPLOAD_TYPES = await FILE_TYPE_MAP[file.type](

View File

@@ -43,9 +43,7 @@ async def alive(bot: BOT, message: Message):
if get_type(url=extra_config.ALIVE_MEDIA) == MediaType.PHOTO:
await bot.send_photo(photo=extra_config.ALIVE_MEDIA, **kwargs)
else:
await bot.send_animation(
animation=extra_config.ALIVE_MEDIA, unsave=True, **kwargs
)
await bot.send_animation(animation=extra_config.ALIVE_MEDIA, unsave=True, **kwargs)
_bot = getattr(bot, "bot", bot)
@@ -60,9 +58,7 @@ if _bot.is_bot:
)
if get_type(url=extra_config.ALIVE_MEDIA) == MediaType.PHOTO:
result_type = InlineQueryResultPhoto(
photo_url=extra_config.ALIVE_MEDIA, **kwargs
)
result_type = InlineQueryResultPhoto(photo_url=extra_config.ALIVE_MEDIA, **kwargs)
else:
result_type = InlineQueryResultAnimation(
animation_url=extra_config.ALIVE_MEDIA, **kwargs

View File

@@ -43,9 +43,7 @@ async def add_scmd(bot: BOT, message: Message):
return
elif not cmd_object.sudo:
await response.edit(
text=f"<b>{cmd_name}</b> is disabled for sudo users.", del_in=10
)
await response.edit(text=f"<b>{cmd_name}</b> is disabled for sudo users.", del_in=10)
return
elif cmd_object.loaded:

View File

@@ -22,9 +22,7 @@ async def disable_su(bot: BOT, message: Message):
@bot.on_message(
filters=filters.command(commands="enable_su", prefixes=Config.SUDO_TRIGGER)
& filters.create(
lambda _, __, m: m.from_user and m.from_user.id in Config.DISABLED_SUPERUSERS
),
& filters.create(lambda _, __, m: m.from_user and m.from_user.id in Config.DISABLED_SUPERUSERS),
group=1,
is_command=True,
filters_edited=True,
@@ -37,6 +35,4 @@ async def enable_su(bot: BOT, message: Message):
await SUDO_USERS.add_data({"_id": u_id, "disabled": False})
await message.reply(
text="Your <b>SuperUser</b> Access is now <code>Enabled</code>.", del_in=10
)
await message.reply(text="Your <b>SuperUser</b> Access is now <code>Enabled</code>.", del_in=10)

View File

@@ -38,9 +38,7 @@ async def sudo(bot: BOT, message: Message):
await SUDO.add_data({"_id": "sudo_switch", "value": value})
await (
await message.reply(text=f"Sudo is enabled: <b>{value}</b>!", del_in=8)
).log()
await (await message.reply(text=f"Sudo is enabled: <b>{value}</b>!", del_in=8)).log()
@BOT.add_cmd(cmd="addsudo", allow_sudo=False)
@@ -110,9 +108,7 @@ async def remove_sudo(bot: BOT, message: Message) -> Message | None:
if "-f" in message.flags:
await SUDO_USERS.delete_data(id=int(message.filtered_input))
await message.reply(
f"Forcefully deleted {message.filtered_input} from sudo users."
)
await message.reply(f"Forcefully deleted {message.filtered_input} from sudo users.")
return
response = await message.reply("Extracting User info...")
@@ -151,9 +147,7 @@ async def remove_sudo(bot: BOT, message: Message) -> Message | None:
await response.log()
def add_and_remove(
u_id: int, add_list: list | None = None, remove_list: list | None = None
):
def add_and_remove(u_id: int, add_list: list | None = None, remove_list: list | None = None):
if add_list is not None and u_id not in add_list:
add_list.append(u_id)

View File

@@ -18,18 +18,14 @@ async def get_ids(bot: BOT, message: Message) -> None:
resp_str += f"<b>{get_name(reply.chat)}</b>: <code>{reply.chat.id}</code>\n"
if reply_forward:
resp_str += (
f"<b>{get_name(reply_forward)}</b>: <code>{reply_forward.id}</code>\n"
)
resp_str += f"<b>{get_name(reply_forward)}</b>: <code>{reply_forward.id}</code>\n"
if reply_user:
resp_str += f"<b>{get_name(reply_user)}</b>: <code>{reply_user.id}</code>"
elif message.input:
resp_str: int = (await bot.get_chat(message.input[1:])).id
else:
resp_str: str = (
f"<b>{get_name(message.chat)}</b>: <code>{message.chat.id}</code>"
)
resp_str: str = f"<b>{get_name(message.chat)}</b>: <code>{message.chat.id}</code>"
await message.reply(resp_str)

View File

@@ -4,9 +4,7 @@ from app import BOT, Message
@BOT.add_cmd(cmd="click")
async def click(bot: BOT, message: Message):
if not message.input or not message.replied:
await message.reply(
"reply to a message containing a button and give a button to click"
)
await message.reply("reply to a message containing a button and give a button to click")
return
try:
await message.replied.click(message.input.strip())

View File

@@ -60,13 +60,8 @@ async def purge_(bot: BOT, message: Message) -> None:
message_ids: list[int] = list(range(start_message, message.id))
# Get messages from server if chat is private or ids are too big.
if (
message.chat.type in {ChatType.PRIVATE, ChatType.BOT}
or len(message_ids) > 100
):
messages = await bot.get_messages(
chat_id=chat_id, message_ids=message_ids, replies=0
)
if message.chat.type in {ChatType.PRIVATE, ChatType.BOT} or len(message_ids) > 100:
messages = await bot.get_messages(chat_id=chat_id, message_ids=message_ids, replies=0)
message_ids = [message.id for message in messages]
# Perform Quick purge of bigger chunks

View File

@@ -50,9 +50,7 @@ async def kang_sticker(bot: BOT, message: Message):
await create_n_kang(
kwargs=kwargs, pack_title=pack_title, pack_name=pack_name, message=message
)
await response.edit(
text=f"Kanged: <a href='t.me/addstickers/{pack_name}'>here</a>"
)
await response.edit(text=f"Kanged: <a href='t.me/addstickers/{pack_name}'>here</a>")
return
async with bot.Convo(client=bot, chat_id="stickers", timeout=5) as convo:
@@ -76,9 +74,7 @@ async def kang_sticker(bot: BOT, message: Message):
await response.edit(text=f"Kanged: <a href='t.me/addstickers/{pack_name}'>here</a>")
async def create_n_kang(
kwargs: dict, pack_title: str, pack_name: str, message: Message
):
async def create_n_kang(kwargs: dict, pack_title: str, pack_name: str, message: Message):
async with bot.Convo(client=bot, chat_id="stickers", timeout=5) as convo:
await convo.send_message(text=kwargs["cmd"], get_response=True)
await convo.send_message(text=pack_title, get_response=True)
@@ -109,9 +105,7 @@ async def get_sticker_set(limit: int, is_video=False) -> tuple[str, str, bool]:
try:
sticker = await bot.invoke(
GetStickerSet(
stickerset=InputStickerSetShortName(
short_name=f"{pack_name}{video}_{count}"
),
stickerset=InputStickerSetShortName(short_name=f"{pack_name}{video}_{count}"),
hash=0,
)
)
@@ -124,9 +118,7 @@ async def get_sticker_set(limit: int, is_video=False) -> tuple[str, str, bool]:
if cus_nick := os.environ.get("CUSTOM_PACK_NAME"):
pack_title = cus_nick + video
else:
pack_title = (
f"{bot.me.username or get_name(bot.me)}'s {video}kang pack vol {count}"
)
pack_title = f"{bot.me.username or get_name(bot.me)}'s {video}kang pack vol {count}"
return pack_title, f"{pack_name}{video}_{count}", create_new
@@ -155,9 +147,7 @@ async def photo_kang(message: Message) -> dict:
file = await asyncio.to_thread(resize_photo, input_file)
return dict(
cmd="/newpack", limit=120, is_video=False, file=file, path=download_path
)
return dict(cmd="/newpack", limit=120, is_video=False, file=file, path=download_path)
def resize_photo(input_file: str) -> BytesIO:
@@ -189,22 +179,14 @@ async def video_kang(message: Message, ff=False) -> dict:
duration = await get_duration(file=input_file)
else:
duration = video.duration
await resize_video(
input_file=input_file, output_file=output_file, duration=duration, ff=ff
)
return dict(
cmd="/newvideo", limit=50, is_video=True, file=output_file, path=download_path
)
await resize_video(input_file=input_file, output_file=output_file, duration=duration, ff=ff)
return dict(cmd="/newvideo", limit=50, is_video=True, file=output_file, path=download_path)
async def resize_video(
input_file: str, output_file: str, duration: int, ff: bool = False
):
async def resize_video(input_file: str, output_file: str, duration: int, ff: bool = False):
cmd = f"ffmpeg -hide_banner -loglevel error -i '{input_file}' -vf "
if ff:
cmd += (
'"scale=w=512:h=512:force_original_aspect_ratio=decrease,setpts=0.3*PTS" '
)
cmd += '"scale=w=512:h=512:force_original_aspect_ratio=decrease,setpts=0.3*PTS" '
cmd += "-ss 0 -t 3 -r 30 -loop 0 -an -c:v libvpx-vp9 -b:v 256k -fs 256k "
elif duration < 3:
cmd += '"scale=w=512:h=512:force_original_aspect_ratio=decrease" '
@@ -233,8 +215,6 @@ async def sticker_kang(message: Message) -> dict:
if sticker.is_video:
input_file: BytesIO = await message.download(in_memory=True)
input_file.seek(0)
return dict(
cmd="/newvideo", emoji=emoji, is_video=True, file=input_file, limit=50
)
return dict(cmd="/newvideo", emoji=emoji, is_video=True, file=input_file, limit=50)
return dict(cmd="/newpack", emoji=emoji, is_video=False, sticker=sticker, limit=120)

View File

@@ -43,18 +43,12 @@ async def logger_switch(bot: BOT, message: Message):
setattr(extra_config, conf_str, value)
await asyncio.gather(
LOGGER.add_data({"_id": f"{text}_logger_switch", "value": value}),
message.reply(
text=f"{text.capitalize()} Logger is enabled: <b>{value}</b>!", del_in=8
),
bot.log_text(
text=f"#{text.capitalize()}Logger is enabled: <b>{value}</b>!", type="info"
),
message.reply(text=f"{text.capitalize()} Logger is enabled: <b>{value}</b>!", del_in=8),
bot.log_text(text=f"#{text.capitalize()}Logger is enabled: <b>{value}</b>!", type="info"),
)
for task in Config.BACKGROUND_TASKS:
if task.get_name() == "pm_tag_logger" and task.done():
Config.BACKGROUND_TASKS.append(
asyncio.create_task(runner(), name="pm_tag_logger")
)
Config.BACKGROUND_TASKS.append(asyncio.create_task(runner(), name="pm_tag_logger"))
BASIC_FILTERS = (
@@ -103,18 +97,13 @@ async def reply_logger(bot: BOT, message: Message):
)
async def mention_logger(bot: BOT, message: Message):
for entity in message.entities or []:
if (
entity.type == MessageEntityType.MENTION
and entity.user
and entity.user.id == bot.me.id
):
if entity.type == MessageEntityType.MENTION and entity.user and entity.user.id == bot.me.id:
cache_message(message)
message.continue_propagation()
@bot.on_message(
filters=(BASIC_FILTERS & (filters.text | filters.media) & TAG_FILTER)
& ~filters.private,
filters=(BASIC_FILTERS & (filters.text | filters.media) & TAG_FILTER) & ~filters.private,
group=2,
is_command=False,
)
@@ -216,13 +205,9 @@ async def log_chat(message: Message):
)
async def log_message(
message: Message, notice: str | None = None, extra_info: str | None = None
):
async def log_message(message: Message, notice: str | None = None, extra_info: str | None = None):
try:
logged_message: Message = await message.forward(
extra_config.MESSAGE_LOGGER_CHAT
)
logged_message: Message = await message.forward(extra_config.MESSAGE_LOGGER_CHAT)
if extra_info:
await logged_message.reply(extra_info, parse_mode=ParseMode.HTML)
except MessageIdInvalid:

View File

@@ -50,8 +50,7 @@ async def handle_new_pm(bot: BOT, message: Message):
user_id = message.from_user.id
if RECENT_USERS[user_id] == 0:
await bot.log_text(
text=f"#PMGUARD\n{message.from_user.mention} [{user_id}] has messaged you.",
type="info",
text=f"#PMGUARD\n{message.from_user.mention} [{user_id}] has messaged you.", type="info"
)
RECENT_USERS[user_id] += 1
@@ -91,9 +90,7 @@ async def pm_guard(bot: BOT, message: Message):
.pmguard | .pmguard -c
"""
if "-c" in message.flags:
await message.reply(
text=f"PM Guard is enabled: <b>{extra_config.PM_GUARD}</b>", del_in=8
)
await message.reply(text=f"PM Guard is enabled: <b>{extra_config.PM_GUARD}</b>", del_in=8)
return
value = not extra_config.PM_GUARD
extra_config.PM_GUARD = value
@@ -140,8 +137,7 @@ async def no_pm(bot: BOT, message: Message):
return
ALLOWED_USERS.remove(user_id)
await asyncio.gather(
message.reply(text=f"{name} Dis-allowed to PM.", del_in=8),
PM_USERS.delete_data(user_id),
message.reply(text=f"{name} Dis-allowed to PM.", del_in=8), PM_USERS.delete_data(user_id)
)

View File

@@ -29,8 +29,5 @@ async def reply(bot: BOT, message: Message) -> None:
return
await bot.send_message(
chat_id=chat_id,
text=text,
reply_to_id=reply_to_id,
disable_preview=True,
chat_id=chat_id, text=text, reply_to_id=reply_to_id, disable_preview=True
)