Retrofit to support modular core

This commit is contained in:
thedragonsinn
2024-02-27 17:31:46 +05:30
parent 68fe0b7f66
commit b881fadea3
39 changed files with 109 additions and 1685 deletions

View File

@@ -1,20 +1,11 @@
import os
import tracemalloc
from dotenv import load_dotenv
tracemalloc.start()
load_dotenv("config.env")
if "com.termux" not in os.environ.get("PATH", ""):
import uvloop
uvloop.install()
from app.config import Config # NOQA
from app.core import DB, DB_CLIENT, CustomDB, Message, Convo # NOQA
from app.core.client import BOT, bot # NOQA
from app.core.logger import LOGGER # NOQA
from ub_core import (
BOT,
DB,
DB_CLIENT,
LOGGER,
Config,
Convo,
CustomDB,
Message,
bot,
)

View File

@@ -1,72 +0,0 @@
import asyncio
from os import environ, path
from typing import Callable, Coroutine
from git import Repo
from app.utils import Str
class Cmd(Str):
def __init__(self, cmd: str, func: Callable, cmd_path: str, sudo: bool):
self.cmd: str = cmd
self.cmd_path: str = cmd_path
self.dirname: str = path.basename(path.dirname(cmd_path))
self.doc: str = func.__doc__ or "Not Documented."
self.func: Callable = func
self.loaded = False
self.sudo: bool = sudo
class Config:
BOT_NAME = "PLAIN-UB"
CMD = Cmd
CMD_DICT: dict[str, Cmd] = {}
CMD_TRIGGER: str = environ.get("CMD_TRIGGER", ".")
DEV_MODE: int = int(environ.get("DEV_MODE", 0))
DISABLED_SUPERUSERS: list[int] = []
FBAN_LOG_CHANNEL: int = int(
environ.get("FBAN_LOG_CHANNEL", environ.get("LOG_CHAT"))
)
FBAN_SUDO_ID: int = int(environ.get("FBAN_SUDO_ID", 0))
FBAN_SUDO_TRIGGER: str = environ.get("FBAN_SUDO_TRIGGER")
GEMINI_API_KEY: str = environ.get("GEMINI_API_KEY")
INIT_TASKS: list[Coroutine] = []
LOG_CHAT: int = int(environ.get("LOG_CHAT"))
MESSAGE_LOGGER_CHAT: int = int(environ.get("MESSAGE_LOGGER_CHAT", LOG_CHAT))
MESSAGE_LOGGER_TASK: asyncio.Task | None = None
OWNER_ID: int = int(environ.get("OWNER_ID"))
PM_GUARD: bool = False
PM_LOGGER: bool = False
REPO: Repo = Repo(".")
SUDO: bool = False
SUDO_TRIGGER: str = environ.get("SUDO_TRIGGER", "!")
SUDO_USERS: list[int] = []
SUPERUSERS: list[int] = []
TAG_LOGGER: bool = False
UPSTREAM_REPO: str = environ.get(
"UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub"
)

View File

@@ -1,3 +0,0 @@
from app.core.conversation import Conversation as Convo
from app.core.db import DB, DB_CLIENT, CustomDB
from app.core.types.message import Message

View File

@@ -1,76 +0,0 @@
import asyncio
import glob
import importlib
import logging
import os
import sys
from pyrogram import Client, idle
from pyrogram.enums import ParseMode
from app import DB_CLIENT, Config
from app.core.conversation import Conversation
from app.core.decorators.add_cmd import AddCmd
from app.core.methods import ChannelLogger, SendMessage
from app.utils.aiohttp_tools import aio
LOGGER = logging.getLogger(Config.BOT_NAME)
def import_modules():
for py_module in glob.glob(pathname="app/**/[!^_]*.py", recursive=True):
name = os.path.splitext(py_module)[0]
py_name = name.replace("/", ".")
try:
mod = importlib.import_module(py_name)
if hasattr(mod, "init_task"):
Config.INIT_TASKS.append(mod.init_task())
except Exception as ie:
LOGGER.error(ie, exc_info=True)
class BOT(AddCmd, SendMessage, ChannelLogger, Client):
def __init__(self):
super().__init__(
name="bot",
api_id=int(os.environ.get("API_ID")),
api_hash=os.environ.get("API_HASH"),
session_string=os.environ.get("SESSION_STRING").strip(),
parse_mode=ParseMode.DEFAULT,
sleep_threshold=30,
max_concurrent_transmissions=2,
)
self.log = LOGGER
self.Convo = Conversation
async def boot(self) -> None:
await super().start()
LOGGER.info("Connected to TG.")
import_modules()
LOGGER.info("Plugins Imported.")
await asyncio.gather(*Config.INIT_TASKS)
Config.INIT_TASKS.clear()
LOGGER.info("Init Tasks Completed.")
await self.log_text(text="<i>Started</i>")
LOGGER.info("Idling...")
await idle()
await self.shut_down()
@staticmethod
async def shut_down():
await aio.close()
if Config.MESSAGE_LOGGER_TASK and not Config.MESSAGE_LOGGER_TASK.done():
Config.MESSAGE_LOGGER_TASK.cancel()
LOGGER.info("DB Closed.")
DB_CLIENT.close()
async def restart(self, hard=False) -> None:
await self.shut_down()
await super().stop(block=False)
if hard:
os.execl("/bin/bash", "/bin/bash", "run")
LOGGER.info("Restarting...")
os.execl(sys.executable, sys.executable, "-m", "app")
bot: BOT = BOT()

View File

@@ -1,129 +0,0 @@
import asyncio
from collections import defaultdict
from typing import Self
from pyrogram import Client
from pyrogram.filters import Filter
from pyrogram.types import Message
from app.utils import Str
class Conversation(Str):
CONVO_DICT: dict[int, list["Conversation"]] = defaultdict(list)
class DuplicateConvo(Exception):
def __init__(self, chat: str | int):
super().__init__(f"Conversation already started with {chat} ")
def __init__(
self,
client: Client,
chat_id: int | str,
check_for_duplicates: bool = True,
filters: Filter | None = None,
timeout: int = 10,
):
self.chat_id: int | str = chat_id
self._client: Client = client
self.check_for_duplicates: bool = check_for_duplicates
self.filters: Filter = filters
self.response_future: asyncio.Future | None = None
self.responses: list[Message] = []
self.timeout: int = timeout
self.set_future()
async def __aenter__(self) -> Self:
"""
Convert Username to ID if chat_id is username.
Check Convo Dict for duplicate Convo with same ID.
Initialize Context Manager and return the Object.
"""
if isinstance(self.chat_id, str):
self.chat_id = (await self._client.get_chat(self.chat_id)).id
if self.check_for_duplicates and self.chat_id in Conversation.CONVO_DICT.keys():
raise self.DuplicateConvo(self.chat_id)
Conversation.CONVO_DICT[self.chat_id].append(self)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit Context Manager and remove Chat ID from Dict."""
Conversation.CONVO_DICT[self.chat_id].remove(self)
if not self.response_future.done():
self.response_future.cancel()
if not Conversation.CONVO_DICT[self.chat_id]:
Conversation.CONVO_DICT.pop(self.chat_id)
@classmethod
async def get_resp(cls, client, *args, **kwargs) -> Message | None:
"""
Bound Method to Gracefully handle Timeout.
but only returns first Message.
"""
try:
async with cls(*args, client=client, **kwargs) as convo:
response: Message | None = await convo.get_response()
return response
except TimeoutError:
return
def set_future(self, *args, **kwargs):
future = asyncio.Future()
future.add_done_callback(self.set_future)
self.response_future = future
"""Methods"""
async def get_response(self, timeout: int = 0) -> Message | None:
"""Returns Latest Message for Specified Filters."""
try:
response: asyncio.Future.result = await asyncio.wait_for(
fut=self.response_future, timeout=timeout or self.timeout
)
return response
except asyncio.TimeoutError:
raise TimeoutError("Conversation Timeout")
async def send_message(
self,
text: str,
timeout: int = 0,
get_response: bool = False,
**kwargs,
) -> Message | tuple[Message, Message]:
"""
Bound Method to Send Texts in Convo Chat.
Returns Sent Message and Response if get_response is True.
"""
message = await self._client.send_message(
chat_id=self.chat_id, text=text, **kwargs
)
if get_response:
response = await self.get_response(timeout=timeout)
return message, response
return message
async def send_document(
self,
document,
caption: str = "",
timeout: int = 0,
get_response: bool = False,
force_document: bool = True,
**kwargs,
) -> Message | tuple[Message, Message]:
"""
Bound Method to Send Documents in Convo Chat.
Returns Sent Message and Response if get_response is True.
"""
message = await self._client.send_document(
chat_id=self.chat_id,
document=document,
caption=caption,
force_document=force_document,
**kwargs,
)
if get_response:
response = await self.get_response(timeout=timeout)
return message, response
return message

View File

@@ -1,40 +0,0 @@
import os
import dns.resolver
from motor.core import AgnosticClient, AgnosticDatabase
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorCollection
from app.utils import Str
dns.resolver.default_resolver = dns.resolver.Resolver(configure=False)
dns.resolver.default_resolver.nameservers = ["8.8.8.8"]
DB_CLIENT: AgnosticClient = AsyncIOMotorClient(os.environ.get("DB_URL").strip())
DB: AgnosticDatabase = DB_CLIENT["plain_ub"]
class CustomDB(AsyncIOMotorCollection, Str):
def __init__(self, collection_name: str):
super().__init__(database=DB, name=collection_name)
async def add_data(self, data: dict) -> None:
"""
:param data: {"_id": db_id, rest of the data}
entry is added or updated if exists.
"""
found = await self.find_one({"_id": data["_id"]})
if not found:
await self.insert_one(data)
else:
await self.update_one({"_id": data.pop("_id")}, {"$set": data})
async def delete_data(self, id: int | str) -> bool | None:
"""
:param id: the db id key to delete.
:return: True if entry was deleted.
"""
found = await self.find_one({"_id": id})
if found:
await self.delete_one({"_id": id})
return True

View File

@@ -1,29 +0,0 @@
import inspect
from functools import wraps
from typing import Callable
from app import Config
class AddCmd:
@staticmethod
def add_cmd(cmd: str | list[str], allow_sudo: bool = True):
def the_decorator(func: Callable):
path = inspect.stack()[1][1]
@wraps(func)
def wrapper():
if isinstance(cmd, list):
for _cmd in cmd:
Config.CMD_DICT[_cmd] = Config.CMD(
cmd=_cmd, func=func, cmd_path=path, sudo=allow_sudo
)
else:
Config.CMD_DICT[cmd] = Config.CMD(
cmd=cmd, func=func, cmd_path=path, sudo=allow_sudo
)
wrapper()
return func
return the_decorator

View File

@@ -1,69 +0,0 @@
from pyrogram import filters as _filters
from pyrogram.types import Message
from app import Config
from app.core.conversation import Conversation
convo_filter = _filters.create(
lambda _, __, message: (message.chat.id in Conversation.CONVO_DICT.keys())
and (not message.reactions)
)
def cmd_check(message: Message, trigger: str, sudo: bool = False) -> bool:
start_str = message.text.split(maxsplit=1)[0]
cmd = start_str.replace(trigger, "", 1)
cmd_obj = Config.CMD_DICT.get(cmd)
if not cmd_obj:
return False
if sudo:
in_loaded = cmd_obj.loaded
has_access = cmd_obj.sudo
return in_loaded and has_access
return True
def basic_check(message: Message):
return message.reactions or not message.text or not message.from_user
def owner_check(filters, client, message: Message) -> bool:
if (
basic_check(message)
or not message.text.startswith(Config.CMD_TRIGGER)
or message.from_user.id != Config.OWNER_ID
or (message.chat.id != Config.OWNER_ID and not message.outgoing)
):
return False
return cmd_check(message, Config.CMD_TRIGGER)
owner_filter = _filters.create(owner_check)
def sudo_check(filters, client, message: Message) -> bool:
if (
not Config.SUDO
or basic_check(message)
or not message.text.startswith(Config.SUDO_TRIGGER)
or message.from_user.id not in Config.SUDO_USERS
):
return False
return cmd_check(message, Config.SUDO_TRIGGER, sudo=True)
sudo_filter = _filters.create(sudo_check)
def super_user_check(filter, client, message: Message):
if (
basic_check(message)
or not message.text.startswith(Config.SUDO_TRIGGER)
or message.from_user.id not in Config.SUPERUSERS
or message.from_user.id in Config.DISABLED_SUPERUSERS
):
return False
return cmd_check(message, Config.SUDO_TRIGGER)
super_user_filter = _filters.create(super_user_check)

View File

@@ -1,43 +0,0 @@
import asyncio
from typing import Callable
from pyrogram import StopPropagation
from pyrogram.types import Message as Msg
from app import BOT, Config, Convo, Message, bot
from app.core.handlers import filters
@bot.on_message(
filters.owner_filter | filters.super_user_filter | filters.sudo_filter, group=1
)
@bot.on_edited_message(
filters.owner_filter | filters.super_user_filter | filters.sudo_filter, group=1
)
async def cmd_dispatcher(bot: BOT, message: Message, func: Callable = None) -> None:
message = Message.parse(message)
func = func or Config.CMD_DICT[message.cmd].func
task = asyncio.create_task(func(bot, message), name=message.task_id)
try:
await task
if message.is_from_owner:
await message.delete()
except asyncio.exceptions.CancelledError:
await bot.log_text(text=f"<b>#Cancelled</b>:\n<code>{message.text}</code>")
except StopPropagation:
raise StopPropagation
except Exception as e:
bot.log.error(e, exc_info=True, extra={"tg_message": message})
message.stop_propagation()
@bot.on_message(filters.convo_filter, group=0)
@bot.on_edited_message(filters.convo_filter, group=0)
async def convo_handler(bot: BOT, message: Msg):
conv_objects: list[Convo] = Convo.CONVO_DICT[message.chat.id]
for conv_object in conv_objects:
if conv_object.filters and not (await conv_object.filters(bot, message)):
continue
conv_object.responses.append(message)
conv_object.response_future.set_result(message)
message.continue_propagation()

View File

@@ -1,69 +0,0 @@
import asyncio
import os
from logging import (
ERROR,
INFO,
WARNING,
Handler,
StreamHandler,
basicConfig,
getLogger,
handlers,
)
from app import Config, bot
os.makedirs(name="logs", exist_ok=True)
LOGGER = getLogger(Config.BOT_NAME)
class TgErrorHandler(Handler):
def emit(self, log_record):
if not bot.is_connected:
return
self.format(log_record)
chat = ""
if hasattr(log_record, "tg_message"):
chat = (
log_record.tg_message.chat.title
or log_record.tg_message.chat.first_name
)
text = (
f"#{log_record.levelname} #TRACEBACK"
f"<b>\nChat</b>: {chat}"
f"\n<b>Line No</b>: <code>{log_record.lineno}</code>"
f"\n<b>Func</b>: <code>{log_record.funcName}</code>"
f"\n<b>Module</b>: <code>{log_record.module}</code>"
f"\n<b>Time</b>: <code>{log_record.asctime}</code>"
f"\n<b>Error Message</b>:\n<pre language=python>{log_record.exc_text or log_record.message}</pre>"
)
asyncio.run_coroutine_threadsafe(
coro=bot.log_text(text=text, name="traceback.txt"), loop=bot.loop
)
custom_handler = TgErrorHandler()
custom_handler.setLevel(ERROR)
basicConfig(
level=INFO,
format="[%(levelname)s] [%(asctime)s] [%(name)s] [%(module)s]: %(message)s",
datefmt="%d-%m-%y %I:%M:%S %p",
handlers={
handlers.RotatingFileHandler(
filename="logs/app_logs.txt",
mode="a",
maxBytes=5 * 1024 * 1024,
backupCount=2,
encoding=None,
delay=False,
),
StreamHandler(),
custom_handler,
},
)
getLogger("pyrogram").setLevel(WARNING)
getLogger("httpx").setLevel(WARNING)
getLogger("aiohttp.access").setLevel(WARNING)

View File

@@ -1,2 +0,0 @@
from app.core.methods.channel_loggers import ChannelLogger
from app.core.methods.send_message import SendMessage

View File

@@ -1,36 +0,0 @@
import logging
from pyrogram import Client
from pyrogram.enums import ParseMode
from app import Config
from app.core.types.message import Message
LOGGER = logging.getLogger(Config.BOT_NAME)
class ChannelLogger(Client):
async def log_text(
self,
text,
name="log.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.HTML,
type: str = "",
) -> Message:
if type:
if hasattr(LOGGER, type):
getattr(LOGGER, type)(text)
text = f"#{type.upper()}\n{text}"
return (await self.send_message(
chat_id=Config.LOG_CHAT,
text=text,
name=name, # NOQA
disable_web_page_preview=disable_web_page_preview,
parse_mode=parse_mode,
)) # fmt:skip
@staticmethod
async def log_message(message: Message):
return (await message.copy(chat_id=Config.LOG_CHAT)) # fmt: skip

View File

@@ -1,31 +0,0 @@
from io import BytesIO
from pyrogram import Client
from app.core.types.message import Message
class SendMessage(Client):
async def send_message(
self,
chat_id: int | str,
text,
name: str = "output.txt",
disable_web_page_preview: bool = False,
**kwargs,
) -> Message:
if not isinstance(text, str):
text = str(text)
if len(text) < 4096:
message = await super().send_message(
chat_id=chat_id,
text=text,
disable_web_page_preview=disable_web_page_preview,
**kwargs,
)
return Message.parse(message=message)
doc = BytesIO(bytes(text, encoding="utf-8"))
doc.name = name
return (await super().send_document(
chat_id=chat_id, document=doc, **kwargs
)) # fmt: skip

View File

@@ -1,176 +0,0 @@
import asyncio
from functools import cached_property
from typing import Self
from pyrogram.enums import MessageEntityType
from pyrogram.errors import MessageDeleteForbidden
from pyrogram.filters import Filter
from pyrogram.types import Message as Msg
from pyrogram.types import User
from app import Config
from app.core.conversation import Conversation
class Message(Msg):
def __init__(self, message: Msg | Self) -> None:
kwargs = self.sanitize_message(message)
super().__init__(**kwargs)
@cached_property
def cmd(self) -> str | None:
if not self.text_list:
return
raw_cmd = self.text_list[0]
cmd = raw_cmd.replace(self.trigger, "", 1)
return cmd if cmd in Config.CMD_DICT.keys() else None
@cached_property
def flags(self) -> list:
return [i for i in self.text_list if i.startswith("-")]
@cached_property
def filtered_input(self) -> str:
split_lines = self.input.split(sep="\n", maxsplit=1)
split_lines[0] = " ".join(
[word for word in split_lines[0].split(" ") if word not in self.flags]
)
return "\n".join(split_lines)
@cached_property
def input(self) -> str:
if len(self.text_list) > 1:
return self.text.split(maxsplit=1)[-1]
return ""
@cached_property
def is_from_owner(self) -> bool:
return self.from_user and self.from_user.id == Config.OWNER_ID
@cached_property
def replied(self) -> "Message":
if self.reply_to_message:
return Message.parse(self.reply_to_message)
@cached_property
def reply_id(self) -> int | None:
return self.replied.id if self.replied else None
@cached_property
def replied_task_id(self) -> str | None:
return self.replied.task_id if self.replied else None
@cached_property
def reply_text_list(self) -> list:
return self.replied.text_list if self.replied else []
@cached_property
def task_id(self) -> str:
return f"{self.chat.id}-{self.id}"
@cached_property
def text_list(self) -> list:
return self.text.split() if self.text else []
@cached_property
def trigger(self):
return Config.CMD_TRIGGER if self.is_from_owner else Config.SUDO_TRIGGER
async def async_deleter(self, del_in, task, block) -> None:
if block:
x = await task
await asyncio.sleep(del_in)
await x.delete()
return x
else:
asyncio.create_task(
self.async_deleter(del_in=del_in, task=task, block=True)
)
async def delete(self, reply: bool = False) -> None:
try:
await super().delete()
if reply and self.replied:
await self.replied.delete()
except MessageDeleteForbidden:
pass
async def edit(
self, text, del_in: int = 0, block=True, name: str = "output.txt", **kwargs
) -> "Message":
if len(str(text)) < 4096:
task = super().edit_text(text=text, **kwargs)
if del_in:
reply = await self.async_deleter(task=task, del_in=del_in, block=block)
else:
reply = Message.parse((await task)) # fmt:skip
self.text = reply.text
else:
_, reply = await asyncio.gather(
super().delete(), self.reply(text, name=name, **kwargs)
)
return reply
async def extract_user_n_reason(self) -> tuple[User | str | Exception, str | None]:
if self.replied:
return self.replied.from_user, self.filtered_input
input_text_list = self.filtered_input.split(maxsplit=1)
if not input_text_list:
return (
"Unable to Extract User info.\nReply to a user or input @ | id.",
None,
)
user = input_text_list[0]
reason = None
if len(input_text_list) >= 2:
reason = input_text_list[1]
if self.entities:
for entity in self.entities:
if entity == MessageEntityType.MENTION:
return entity.user, reason
if user.isdigit():
user = int(user)
elif user.startswith("@"):
user = user.strip("@")
try:
return (await self._client.get_users(user_ids=user)), reason
except Exception:
return user, reason
async def get_response(self, filters: Filter = None, timeout: int = 8):
response: Message | None = await Conversation.get_resp(
client=self._client, chat_id=self.chat.id, filters=filters, timeout=timeout
)
return response
async def log(self):
return (await self.copy(Config.LOG_CHAT)) # fmt:skip
async def reply(
self, text, del_in: int = 0, block: bool = True, **kwargs
) -> "Message":
task = self._client.send_message(
chat_id=self.chat.id, text=text, reply_to_message_id=self.id, **kwargs
)
if del_in:
await self.async_deleter(task=task, del_in=del_in, block=block)
else:
return Message.parse((await task)) # fmt:skip
@staticmethod
def sanitize_message(message):
kwargs = vars(message).copy()
kwargs["client"] = kwargs.pop("_client", message._client)
[
kwargs.pop(arg, 0)
for arg in dir(Message)
if (
isinstance(getattr(Message, arg, 0), (cached_property, property))
and not hasattr(Msg, arg)
)
]
return kwargs
@classmethod
def parse(cls, message: Msg) -> "Message":
return cls(message)

29
app/extra_config.py Normal file
View File

@@ -0,0 +1,29 @@
from os import environ
BOT_NAME = "PLAIN-UB"
DISABLED_SUPERUSERS: list[int] = []
FBAN_LOG_CHANNEL: int = int(environ.get("FBAN_LOG_CHANNEL", environ.get("LOG_CHAT")))
FBAN_SUDO_ID: int = int(environ.get("FBAN_SUDO_ID", 0))
FBAN_SUDO_TRIGGER: str = environ.get("FBAN_SUDO_TRIGGER")
GEMINI_API_KEY: str = environ.get("GEMINI_API_KEY")
LOAD_HANDLERS: bool = True
MESSAGE_LOGGER_CHAT: int = int(
environ.get("MESSAGE_LOGGER_CHAT", environ.get("LOG_CHAT"))
)
PM_GUARD: bool = False
PM_LOGGER: bool = False
TAG_LOGGER: bool = False
UPSTREAM_REPO: str = environ.get(
"UPSTREAM_REPO", "https://github.com/thedragonsinn/plain-ub"
)

View File

@@ -3,9 +3,9 @@ import asyncio
from pyrogram import filters
from pyrogram.enums import ChatMemberStatus, ChatType
from pyrogram.types import Chat, User
from ub_core.utils.helpers import get_name
from app import BOT, Config, CustomDB, Message, bot
from app.utils.helpers import get_name
from app import BOT, Config, CustomDB, Message, bot, extra_config
FED_DB = CustomDB("FED_LIST")
@@ -116,7 +116,7 @@ async def fed_ban(bot: BOT, message: Message):
if not message.replied:
await progress.edit("Reply to a proof")
return
proof = await message.replied.forward(Config.FBAN_LOG_CHANNEL)
proof = await message.replied.forward(extra_config.FBAN_LOG_CHANNEL)
proof_str = f"\n{ {proof.link} }"
reason = f"{reason}{proof_str}"
@@ -229,7 +229,9 @@ async def perform_fed_task(
if not message.is_from_owner:
resp_str += f"\n\n<b>By</b>: {get_name(message.from_user)}"
await bot.send_message(
chat_id=Config.FBAN_LOG_CHANNEL, text=resp_str, disable_web_page_preview=True
chat_id=extra_config.FBAN_LOG_CHANNEL,
text=resp_str,
disable_web_page_preview=True,
)
await progress.edit(
text=resp_str, del_in=5, block=True, disable_web_page_preview=True
@@ -238,9 +240,9 @@ async def perform_fed_task(
async def handle_sudo_fban(command: str):
if not (Config.FBAN_SUDO_ID and Config.FBAN_SUDO_TRIGGER):
if not (extra_config.FBAN_SUDO_ID and extra_config.FBAN_SUDO_TRIGGER):
return
sudo_cmd = command.replace("/", Config.FBAN_SUDO_TRIGGER, 1)
sudo_cmd = command.replace("/", extra_config.FBAN_SUDO_TRIGGER, 1)
await bot.send_message(
chat_id=Config.FBAN_SUDO_ID, text=sudo_cmd, disable_web_page_preview=True
chat_id=extra_config.FBAN_SUDO_ID, text=sudo_cmd, disable_web_page_preview=True
)

View File

@@ -1,66 +0,0 @@
import asyncio
import inspect
import sys
import traceback
from io import StringIO
from pyrogram.enums import ParseMode
from app import Config, bot, BOT, Message, CustomDB, DB, DB_CLIENT # isort:skip
from app.utils import shell # isort:skip
from app.utils.aiohttp_tools import aio # isort:skip
async def executor(bot: BOT, message: Message) -> Message | None:
"""
CMD: PY
INFO: Run Python Code.
FLAGS: -s to only show output.
USAGE:
.py [-s] return 1
"""
code: str = message.filtered_input.strip()
if not code:
return await message.reply("exec Jo mama?")
reply: Message = await message.reply("executing")
sys.stdout = codeOut = StringIO()
sys.stderr = codeErr = StringIO()
# Indent code as per proper python syntax
formatted_code = "\n ".join(code.splitlines())
try:
# Create and initialise the function
exec(f"async def _exec(bot, message):\n {formatted_code}")
func_out = await asyncio.Task(
locals()["_exec"](bot, message), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled....`")
except Exception:
func_out = str(traceback.format_exc())
sys.stdout = sys.__stdout__
sys.stderr = sys.__stderr__
output = codeErr.getvalue().strip() or codeOut.getvalue().strip()
if func_out is not None:
output = f"{output}\n\n{func_out}".strip()
elif not output and "-s" in message.flags:
await reply.delete()
return
if "-s" in message.flags:
output = f">> ```\n{output}```"
else:
output = f"```python\n{code}```\n\n```\n{output}```"
await reply.edit(
output,
name="exec.txt",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
if Config.DEV_MODE:
Config.CMD_DICT["py"] = Config.CMD(
cmd="py",
func=executor,
cmd_path=inspect.stack()[0][1],
sudo=False,
)

View File

@@ -1,46 +0,0 @@
import importlib
import inspect
import os
import sys
import traceback
from app import BOT, Config, Message
async def loader(bot: BOT, message: Message) -> Message | None:
if (
not message.replied
or not message.replied.document
or not message.replied.document.file_name.endswith(".py")
) and "-r" not in message.flags:
await message.reply("Reply to a Plugin.")
return
if "-r" in message.flags:
plugin = message.filtered_input
cmd_module = Config.CMD_DICT.get(plugin)
if not cmd_module:
await message.reply(text="Invalid cmd.")
return
module = str(cmd_module.func.__module__)
else:
file_name: str = os.path.splitext(message.replied.document.file_name)[0]
module = f"app.temp.{file_name}"
await message.replied.download("app/temp/")
reply: Message = await message.reply("Loading....")
reload = sys.modules.pop(module, None)
status: str = "Reloaded" if reload else "Loaded"
try:
importlib.import_module(module)
except Exception:
await reply.edit(str(traceback.format_exc()))
return
await reply.edit(f"{status} {module}")
if Config.DEV_MODE:
Config.CMD_DICT["load"] = Config.CMD(
cmd="load",
func=loader,
cmd_path=inspect.stack()[0][1],
sudo=False,
)

View File

@@ -1,65 +0,0 @@
import asyncio
import inspect
from pyrogram.enums import ParseMode
from app import BOT, Config, Message
from app.utils import shell
async def run_cmd(bot: BOT, message: Message) -> Message | None:
cmd: str = message.input.strip()
reply: Message = await message.reply("executing...")
try:
proc_stdout: str = await asyncio.create_task(
shell.run_shell_cmd(cmd), name=reply.task_id
)
except asyncio.exceptions.CancelledError:
return await reply.edit("`Cancelled...`")
output: str = f"<pre language=shell>~${cmd}\n\n{proc_stdout}</pre>"
return await reply.edit(output, name="sh.txt", disable_web_page_preview=True)
# Shell with Live Output
async def live_shell(bot: BOT, message: Message):
cmd: str = message.input.strip()
reply: Message = await message.reply("`getting live output....`")
sub_process: shell.AsyncShell = await shell.AsyncShell.run_cmd(cmd)
sleep_for: int = 1
output: str = ""
try:
async for stdout in sub_process.get_output():
if output != stdout:
await reply.edit(
text=f"```shell\n{stdout}```",
disable_web_page_preview=True,
parse_mode=ParseMode.MARKDOWN,
)
output = stdout
if sleep_for >= 6:
sleep_for = 2
await asyncio.create_task(asyncio.sleep(sleep_for), name=reply.task_id)
sleep_for += 2
await reply.edit(
text=f"<pre language=shell>~${cmd}\n\n{sub_process.full_std}</pre>",
name="shell.txt",
disable_web_page_preview=True,
)
except asyncio.exceptions.CancelledError:
sub_process.cancel()
await reply.edit(f"`Cancelled....`")
if Config.DEV_MODE:
Config.CMD_DICT["shell"] = Config.CMD(
cmd="shell",
func=live_shell,
cmd_path=inspect.stack()[0][1],
sudo=False,
)
Config.CMD_DICT["sh"] = Config.CMD(
cmd="sh",
func=run_cmd,
cmd_path=inspect.stack()[0][1],
sudo=False,
)

View File

@@ -2,11 +2,11 @@ import asyncio
import os
import time
from app import BOT, bot
from app.core import Message
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
from app.utils.media_helper import get_tg_media_details
from ub_core.utils.downloader import Download, DownloadedFile
from ub_core.utils.helpers import progress
from ub_core.utils.media_helper import get_tg_media_details
from app import BOT, Message, bot
@bot.add_cmd(cmd="download")

View File

@@ -3,12 +3,12 @@ import os
import shutil
import time
from app import BOT, bot
from app.core import Message
from ub_core.utils.downloader import Download, DownloadedFile
from ub_core.utils.helpers import progress
from app import BOT, Message, bot
from app.plugins.files.download import telegram_download
from app.plugins.files.upload import FILE_TYPE_MAP
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
@bot.add_cmd(cmd="rename")

View File

@@ -2,12 +2,12 @@ import asyncio
import os
import time
from app import BOT, Config, bot
from app.core import Message
from app.utils.downloader import Download, DownloadedFile
from app.utils.helpers import progress
from app.utils.media_helper import MediaType, bytes_to_mb
from app.utils.shell import check_audio, get_duration, take_ss
from ub_core.utils.downloader import Download, DownloadedFile
from ub_core.utils.helpers import progress
from ub_core.utils.media_helper import MediaType, bytes_to_mb
from ub_core.utils.shell import check_audio, get_duration, take_ss
from app import BOT, Config, Message, bot
async def video_upload(

View File

@@ -6,18 +6,18 @@ from pyrogram import filters
from pyrogram.enums import ParseMode
from pyrogram.types import Message as Msg
from app import BOT, Config, Convo, Message, bot
from app import BOT, Convo, Message, bot, extra_config
MODEL = genai.GenerativeModel("gemini-pro")
async def init_task():
if Config.GEMINI_API_KEY:
genai.configure(api_key=Config.GEMINI_API_KEY)
if extra_config.GEMINI_API_KEY:
genai.configure(api_key=extra_config.GEMINI_API_KEY)
async def basic_check(message: Message):
if not Config.GEMINI_API_KEY:
if not extra_config.GEMINI_API_KEY:
await message.reply(
"Gemini API KEY not found."
"\nGet it <a href='https://makersuite.google.com/app/u/2/apikey'>HERE</a> "
@@ -88,7 +88,7 @@ async def ai_chat(bot: BOT, message: Message):
)
return
resp = await message.reply("<i>Loading History...</i>")
doc: BytesIO = (await reply.download(in_memory=True)).getbuffer()
doc: BytesIO = (await reply.download(in_memory=True)).getbuffer() # NOQA
history = pickle.loads(doc)
await resp.edit("<i>History Loaded... Resuming chat</i>")
chat = MODEL.start_chat(history=history)

View File

@@ -6,9 +6,9 @@ from time import time
from urllib.parse import urlparse
import yt_dlp
from ub_core.utils.aiohttp_tools import aio
from app import Message, bot
from app.utils.aiohttp_tools import aio
domains = [
"www.youtube.com",

View File

@@ -1,9 +1,9 @@
import asyncio
from pyrogram.types import User
from ub_core.utils.helpers import extract_user_data, get_name
from app import BOT, Config, CustomDB, Message, bot
from app.utils.helpers import extract_user_data, get_name
SUDO = CustomDB("COMMON_SETTINGS")
SUDO_USERS = CustomDB("SUDO_USERS")

View File

@@ -1,14 +0,0 @@
import aiofiles
from app import BOT, bot
from app.core import Message
@bot.add_cmd(cmd="logs")
async def read_logs(bot: BOT, message: Message):
async with aiofiles.open("logs/app_logs.txt", "r") as aio_file:
text = await aio_file.read()
if len(text) < 4050:
await message.reply(f"<pre language=java>{text}</pre>")
else:
await message.reply_document(document="logs/app_logs.txt")

View File

@@ -1,41 +0,0 @@
import os
import shutil
from pyrogram.enums import ChatType
from app import BOT, Message, bot
async def init_task() -> None:
restart_msg = int(os.environ.get("RESTART_MSG", 0))
restart_chat = int(os.environ.get("RESTART_CHAT", 0))
if restart_msg and restart_chat:
await bot.get_chat(restart_chat)
await bot.edit_message_text(
chat_id=restart_chat, message_id=restart_msg, text="__Started__"
)
os.environ.pop("RESTART_MSG")
os.environ.pop("RESTART_CHAT")
@bot.add_cmd(cmd="restart")
async def restart(bot: BOT, message: Message, u_resp: Message | None = None) -> None:
"""
CMD: RESTART
INFO: Restart the Bot.
FLAGS:
-h: for hard restart.
-cl: for clearing logs.
-cp: for clearing temp plugins.
Usage:
.restart | .restart -h
"""
reply: Message = u_resp or await message.reply("restarting....")
if reply.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP):
os.environ["RESTART_MSG"] = str(reply.id)
os.environ["RESTART_CHAT"] = str(reply.chat.id)
if "-cl" in message.flags:
os.remove("logs/app_logs.txt")
if "-cp" in message.flags:
shutil.rmtree("app/temp")
await bot.restart(hard="-h" in message.flags)

View File

@@ -1,73 +0,0 @@
import asyncio
from git import Repo
from app import BOT, Config, Message, bot
from app.plugins.sys_utils.restart import restart
async def get_commits(repo: Repo) -> str | None:
try:
async with asyncio.timeout(10):
await asyncio.to_thread(repo.git.fetch)
except TimeoutError:
return
commits: str = ""
limit: int = 0
for commit in repo.iter_commits("HEAD..origin/main"):
commits += (
f"<b>#{commit.count()}</b> "
f"<a href='{Config.UPSTREAM_REPO}/commit/{commit}'>{commit.message}</a> "
f"By <i>{commit.author}</i>"
)
limit += 1
if limit >= 15:
break
return commits
async def pull_commits(repo: Repo) -> None | bool:
repo.git.reset("--hard")
try:
async with asyncio.timeout(10):
await asyncio.to_thread(
repo.git.pull, Config.UPSTREAM_REPO, "--rebase=true"
)
return True
except TimeoutError:
return
@bot.add_cmd(cmd="update")
async def updater(bot: BOT, message: Message) -> None | Message:
"""
CMD: UPDATE
INFO: Pull / Check for updates.
FLAGS: -pull to pull updates
USAGE:
.update | .update -pull
"""
reply: Message = await message.reply("Checking for Updates....")
repo: Repo = Config.REPO
commits: str = await get_commits(repo)
if commits is None:
await reply.edit("Timeout... Try again.")
return
if not commits:
await reply.edit(text="Already Up To Date.", del_in=5)
return
if "-pull" not in message.flags:
await reply.edit(
text=f"<b>Update Available:</b>\n{commits}", disable_web_page_preview=True
)
return
if not (await pull_commits(repo)): # NOQA
await reply.edit("Timeout...Try again.")
return
await asyncio.gather(
bot.log_text(
text=f"#Updater\nPulled:\n{commits}", disable_web_page_preview=True
),
reply.edit("<b>Update Found</b>\n<i>Pulling....</i>"),
)
await restart(bot, message, reply)

View File

@@ -1,5 +1,4 @@
from app import BOT, bot
from app.core import Message
from app import BOT, Message, bot
from app.plugins.tg_tools.get_message import parse_link

View File

@@ -10,11 +10,11 @@ from pyrogram.enums import MessageMediaType
from pyrogram.errors import StickersetInvalid
from pyrogram.raw.functions.messages import GetStickerSet
from pyrogram.raw.types import InputStickerSetShortName
from ub_core.utils.helpers import get_name
from ub_core.utils.media_helper import MediaExts
from ub_core.utils.shell import get_duration, run_shell_cmd
from app import BOT, Message, bot
from app.utils.helpers import get_name
from app.utils.media_helper import MediaExts
from app.utils.shell import get_duration, run_shell_cmd
EMOJIS = ("", "🤡", "🙂", "🤔", "🔪", "😂", "💀")

View File

@@ -4,9 +4,9 @@ from collections import defaultdict
from pyrogram import filters
from pyrogram.enums import ChatType, MessageEntityType
from pyrogram.errors import MessageIdInvalid
from ub_core.utils.helpers import get_name
from app import BOT, Config, CustomDB, Message, bot
from app.utils.helpers import get_name
from app import BOT, Config, CustomDB, Message, bot, extra_config
LOGGER = CustomDB("COMMON_SETTINGS")
@@ -19,10 +19,10 @@ async def init_task():
tag_check = await LOGGER.find_one({"_id": "tag_logger_switch"})
pm_check = await LOGGER.find_one({"_id": "pm_logger_switch"})
if tag_check:
Config.TAG_LOGGER = tag_check["value"]
extra_config.TAG_LOGGER = tag_check["value"]
if pm_check:
Config.PM_LOGGER = pm_check["value"]
Config.MESSAGE_LOGGER_TASK = asyncio.create_task(runner())
extra_config.PM_LOGGER = pm_check["value"]
Config.BACKGROUND_TASKS.append(asyncio.create_task(runner(), name="pm_tag_logger"))
@bot.add_cmd(cmd=["taglogger", "pmlogger"])
@@ -36,12 +36,12 @@ async def logger_switch(bot: BOT, message: Message):
conf_str = f"{text.upper()}_LOGGER"
if "-c" in message.flags:
await message.reply(
text=f"{text.capitalize()} Logger is enabled: <b>{getattr(Config, conf_str)}</b>!",
text=f"{text.capitalize()} Logger is enabled: <b>{getattr(extra_config, conf_str)}</b>!",
del_in=8,
)
return
value: bool = not getattr(Config, conf_str)
setattr(Config, conf_str, value)
value: bool = not getattr(extra_config, conf_str)
setattr(extra_config, conf_str, value)
await asyncio.gather(
LOGGER.add_data({"_id": f"{text}_logger_switch", "value": value}),
message.reply(
@@ -51,8 +51,11 @@ async def logger_switch(bot: BOT, message: Message):
text=f"#{text.capitalize()}Logger is enabled: <b>{value}</b>!", type="info"
),
)
if not Config.MESSAGE_LOGGER_TASK or Config.MESSAGE_LOGGER_TASK.done():
Config.MESSAGE_LOGGER_TASK = asyncio.create_task(runner())
for task in Config.BACKGROUND_TASKS:
if task.get_name() == "pm_tag_logger" and not task.done():
Config.BACKGROUND_TASKS.append(
asyncio.create_task(runner(), name="pm_tag_logger")
)
basic_filters = (
@@ -67,14 +70,14 @@ basic_filters = (
@bot.on_message(
filters=basic_filters
& filters.private
& filters.create(lambda _, __, ___: Config.PM_LOGGER),
& filters.create(lambda _, __, ___: extra_config.PM_LOGGER),
group=2,
)
async def pm_logger(bot: BOT, message: Message):
cache_message(message)
tag_filter = filters.create(lambda _, __, ___: Config.TAG_LOGGER)
tag_filter = filters.create(lambda _, __, ___: extra_config.TAG_LOGGER)
@bot.on_message(
@@ -128,7 +131,7 @@ def cache_message(message: Message):
async def runner():
if not (Config.TAG_LOGGER or Config.PM_LOGGER):
if not (extra_config.TAG_LOGGER or extra_config.PM_LOGGER):
return
last_pm_logged_id = 0
while True:
@@ -158,11 +161,11 @@ async def runner():
async def log_pm(message: Message, log_info: bool):
if log_info:
await bot.send_message(
chat_id=Config.MESSAGE_LOGGER_CHAT,
chat_id=extra_config.MESSAGE_LOGGER_CHAT,
text=f"#PM\n{message.from_user.mention} [{message.from_user.id}]",
)
try:
await message.forward(Config.MESSAGE_LOGGER_CHAT)
await message.forward(extra_config.MESSAGE_LOGGER_CHAT)
except MessageIdInvalid:
notice = (
f"{message.from_user.mention} [{message.from_user.id}] deleted this message."
@@ -172,7 +175,7 @@ async def log_pm(message: Message, log_info: bool):
f"Caption:\n{message.caption or 'No Caption in media.'}"
)
await message.copy(Config.MESSAGE_LOGGER_CHAT, caption=notice)
await message.copy(extra_config.MESSAGE_LOGGER_CHAT, caption=notice)
async def log_chat(message: Message):
@@ -190,15 +193,15 @@ async def log_chat(message: Message):
if message.reply_to_message:
try:
await message.reply_to_message.forward(Config.MESSAGE_LOGGER_CHAT)
await message.reply_to_message.forward(extra_config.MESSAGE_LOGGER_CHAT)
except MessageIdInvalid:
await message.reply_to_message.copy(
Config.MESSAGE_LOGGER_CHAT, caption=notice
extra_config.MESSAGE_LOGGER_CHAT, caption=notice
)
try:
logged = await message.forward(Config.MESSAGE_LOGGER_CHAT)
logged = await message.forward(extra_config.MESSAGE_LOGGER_CHAT)
await logged.reply(
text=f"#TAG\n{mention} [{u_id}]\nMessage: \n<a href='{message.link}'>{message.chat.title}</a> ({message.chat.id})",
)
except MessageIdInvalid:
await message.copy(Config.MESSAGE_LOGGER_CHAT, caption=notice)
await message.copy(extra_config.MESSAGE_LOGGER_CHAT, caption=notice)

View File

@@ -3,9 +3,9 @@ from collections import defaultdict
from pyrogram import filters
from pyrogram.enums import ChatType
from ub_core.utils.helpers import get_name
from app import BOT, Config, CustomDB, Message, bot
from app.utils.helpers import get_name
from app import BOT, CustomDB, Message, bot, extra_config
PM_USERS = CustomDB("PM_USERS")
@@ -15,15 +15,15 @@ ALLOWED_USERS: list[int] = []
allowed_filter = filters.create(lambda _, __, m: m.chat.id in ALLOWED_USERS)
guard_check = filters.create(lambda _, __, ___: Config.PM_GUARD)
guard_check = filters.create(lambda _, __, ___: extra_config.PM_GUARD)
RECENT_USERS: dict = defaultdict(int)
async def init_task():
guard = (await PM_GUARD.find_one({"_id": "guard_switch"})) or {}
extra_config.PM_GUARD = guard.get("value", False)
[ALLOWED_USERS.append(user_id["_id"]) async for user_id in PM_USERS.find()]
Config.PM_GUARD = guard.get("value", False)
@bot.on_message(
@@ -82,11 +82,11 @@ async def pmguard(bot: BOT, message: Message):
"""
if "-c" in message.flags:
await message.reply(
text=f"PM Guard is enabled: <b>{Config.PM_GUARD}</b>", del_in=8
text=f"PM Guard is enabled: <b>{extra_config.PM_GUARD}</b>", del_in=8
)
return
value = not Config.PM_GUARD
Config.PM_GUARD = value
value = not extra_config.PM_GUARD
extra_config.PM_GUARD = value
await asyncio.gather(
PM_GUARD.add_data({"_id": "guard_switch", "value": value}),
message.reply(text=f"PM Guard is enabled: <b>{value}</b>!", del_in=8),

View File

@@ -1,6 +0,0 @@
import json
class Str:
def __str__(self):
return json.dumps(self.__dict__, indent=4, ensure_ascii=False, default=str)

View File

@@ -1,82 +0,0 @@
import json
import logging
import os
from io import BytesIO
from aiohttp import ClientSession, ContentTypeError, web
from app import Config
from app.utils.media_helper import get_filename_from_url
LOGGER = logging.getLogger(Config.BOT_NAME)
class Aio:
def __init__(self):
self.session: ClientSession | None = None
self.app = None
self.port = os.environ.get("API_PORT", 0)
self.runner = None
if self.port:
Config.INIT_TASKS.append(self.set_site())
Config.INIT_TASKS.append(self.set_session())
async def close(self):
if not self.session.closed:
await self.session.close()
if self.runner:
await self.runner.cleanup()
async def set_session(self):
self.session = ClientSession()
async def set_site(self):
LOGGER.info("Starting Static WebSite.")
self.app = web.Application()
self.app.router.add_get(path="/", handler=self.handle_request)
self.runner = web.AppRunner(self.app)
await self.runner.setup()
site = web.TCPSite(
self.runner, "0.0.0.0", self.port, reuse_address=True, reuse_port=True
)
await site.start()
@staticmethod
async def handle_request(_):
return web.Response(text="Web Server Running...")
async def get_json(
self,
url: str,
headers: dict = None,
params: dict | str = None,
json_: bool = False,
timeout: int = 10,
) -> dict | None:
try:
async with self.session.get(
url=url, headers=headers, params=params, timeout=timeout
) as ses:
if json_:
return await ses.json()
else:
return (json.loads(await ses.text())) # fmt:skip
except (json.JSONDecodeError, ContentTypeError):
LOGGER.debug(await ses.text())
except TimeoutError:
LOGGER.debug("Timeout")
async def in_memory_dl(self, url: str) -> BytesIO:
async with self.session.get(url) as remote_file:
bytes_data = await remote_file.read()
file = BytesIO(bytes_data)
file.name = get_filename_from_url(url, tg_safe=True)
return file
async def thumb_dl(self, thumb) -> BytesIO | str | None:
if not thumb or not thumb.startswith("http"):
return thumb
return (await self.in_memory_dl(thumb)) # fmt:skip
aio = Aio()

View File

@@ -1,179 +0,0 @@
import asyncio
import os
import shutil
from functools import cached_property
import aiofiles
import aiohttp
from pyrogram.types import Message as Msg
from app.core.types.message import Message
from app.utils import Str
from app.utils.helpers import progress
from app.utils.media_helper import (bytes_to_mb, get_filename_from_headers,
get_filename_from_url, get_type)
class DownloadedFile(Str):
def __init__(
self,
name: str,
path: str,
full_path: str,
size: int | float):
self.name = name
self.path = path
self.full_path = full_path
self.size = size
self.type = get_type(path=name)
class Download(Str):
"""Download a file in async using aiohttp.
Attributes:
url (str):
file url.
path (str):
download path without file name.
message_to_edit:
response message to edit for progress.
custom_file_name:
override the file name.
Returns:
ON success a DownloadedFile object is returned.
Methods:
dl_obj = await Download.setup(
url="https....",
path="downloads",
message_to_edit=response,
)
file = await dl_obj.download()
"""
class DuplicateDownload(Exception):
def __init__(self, path: str):
super().__init__(f"path {path} already exists!")
def __init__(
self,
url: str,
path: str,
file_session: aiohttp.ClientResponse,
session: aiohttp.client,
headers: aiohttp.ClientResponse.headers,
custom_file_name: str | None = None,
message_to_edit: Message | Msg | None = None,
):
self.url: str = url
self.path: str = path
self.headers: aiohttp.ClientResponse.headers = headers
self.custom_file_name: str = custom_file_name
self.file_session: aiohttp.ClientResponse = file_session
self.session: aiohttp.ClientSession = session
self.message_to_edit: Message | Msg | None = message_to_edit
self.raw_completed_size: int = 0
self.has_started: bool = False
self.is_done: bool = False
os.makedirs(name=path, exist_ok=True)
@classmethod
async def setup(
cls,
url: str,
path: str = "downloads",
message_to_edit: Message | None = None,
custom_file_name: str | None = None,
) -> "Download":
session = aiohttp.ClientSession()
file_session = await session.get(url=url)
headers = file_session.headers
cls_object = cls(
url=url,
path=path,
file_session=file_session,
session=session,
headers=headers,
message_to_edit=message_to_edit,
custom_file_name=custom_file_name,
)
await asyncio.gather(
cls_object.check_disk_space(), cls_object.check_duplicates()
)
return cls_object
async def check_disk_space(self):
if shutil.disk_usage(self.path).free < self.raw_size:
await self.close()
raise MemoryError(
f"Not enough space in {self.path} to download {self.size}mb."
)
async def check_duplicates(self):
if os.path.isfile(self.full_path):
await self.close()
raise self.DuplicateDownload(self.full_path)
@property
def completed_size(self):
"""Size in MB"""
return bytes_to_mb(self.raw_completed_size)
@cached_property
def file_name(self):
if self.custom_file_name:
return self.custom_file_name
return get_filename_from_headers(
self.headers) or get_filename_from_url(
self.url)
@cached_property
def full_path(self):
return os.path.join(self.path, self.file_name)
@cached_property
def raw_size(self):
# File Size in Bytes
return int(self.headers.get("Content-Length", 0))
@cached_property
def size(self):
"""File size in MBs"""
return bytes_to_mb(self.raw_size)
async def close(self):
if not self.session.closed:
await self.session.close()
if not self.file_session.closed:
self.file_session.close()
async def download(self) -> DownloadedFile | None:
if self.session.closed:
return
async with aiofiles.open(self.full_path, "wb") as async_file:
self.has_started = True
while file_chunk := (await self.file_session.content.read(1024)): # NOQA
await async_file.write(file_chunk)
self.raw_completed_size += 1024
await progress(
current=self.raw_completed_size,
total=self.raw_size,
response=self.message_to_edit,
action="Downloading...",
file_name=self.file_name,
file_path=self.full_path,
)
self.is_done = True
await self.close()
return self.return_file()
def return_file(self) -> DownloadedFile:
if os.path.isfile(self.full_path):
return DownloadedFile(
name=self.file_name,
path=self.path,
full_path=self.full_path,
size=self.size,
)

View File

@@ -1,76 +0,0 @@
import time
from pyrogram.types import Chat, Message, User
from telegraph.aio import Telegraph
from app import LOGGER, Config
from app.utils.media_helper import bytes_to_mb
TELEGRAPH: None | Telegraph = None
PROGRESS_DICT = {}
async def init_task():
global TELEGRAPH
TELEGRAPH = Telegraph()
try:
await TELEGRAPH.create_account(
short_name=Config.BOT_NAME,
author_name=Config.BOT_NAME,
author_url=Config.UPSTREAM_REPO,
)
except Exception:
LOGGER.error("Failed to Create Telegraph Account.")
async def post_to_telegraph(title: str, text: str):
telegraph = await TELEGRAPH.create_page(
title=title,
html_content=f"<p>{text}</p>",
author_name=Config.BOT_NAME,
author_url=Config.UPSTREAM_REPO,
)
return telegraph["url"]
def get_name(user_or_chat: User | Chat) -> str:
first = user_or_chat.first_name or ""
last = user_or_chat.last_name or ""
name = f"{first} {last}".strip()
if not name:
name = user_or_chat.title
return name
def extract_user_data(user: User) -> dict:
return dict(name=get_name(user), username=user.username, mention=user.mention)
async def progress(
current: int,
total: int,
response: Message | None = None,
action: str = "",
file_name: str = "",
file_path: str = "",
):
if not response:
return
if current == total:
PROGRESS_DICT.pop(file_path, "")
return
current_time = time.time()
if file_path not in PROGRESS_DICT or (current_time - PROGRESS_DICT[file_path]) > 5:
PROGRESS_DICT[file_path] = current_time
if total:
percentage = round((current * 100 / total), 1)
else:
percentage = 0
await response.edit(
f"<b>{action}</b>"
f"\n<pre language=bash>"
f"\nfile={file_name}"
f"\npath={file_path}"
f"\nsize={bytes_to_mb(total)}mb"
f"\ncompleted={bytes_to_mb(current)}mb | {percentage}%</pre>"
)

View File

@@ -1,94 +0,0 @@
import re
from enum import Enum, auto
from os.path import basename, splitext
from urllib.parse import unquote_plus, urlparse
from pyrogram.enums import MessageMediaType
from pyrogram.types import Message
class MediaType(Enum):
AUDIO = auto()
DOCUMENT = auto()
GIF = auto()
GROUP = auto()
MESSAGE = auto()
PHOTO = auto()
STICKER = auto()
VIDEO = auto()
class MediaExts:
PHOTO = {".png", ".jpg", ".jpeg", ".heic", ".webp"}
VIDEO = {".mp4", ".mkv", ".webm"}
GIF = {".gif"}
AUDIO = {".aac", ".mp3", ".opus", ".m4a", ".ogg", ".flac"}
def bytes_to_mb(size: int):
return round(size / 1048576, 1)
def get_filename_from_url(url: str, tg_safe: bool = False) -> str:
parsed_url = urlparse(unquote_plus(url))
name = basename(parsed_url.path.rstrip("/"))
if tg_safe:
return make_file_name_tg_safe(file_name=name)
return name
def get_filename_from_headers(headers: dict, tg_safe: bool = False) -> str | None:
content_disposition = headers.get("Content-Disposition", "")
match = re.search(r"filename=(.+)", content_disposition)
if not match:
return
if tg_safe:
return make_file_name_tg_safe(file_name=match.group(1))
return match.group(1)
def make_file_name_tg_safe(file_name: str) -> str:
if file_name.lower().endswith((".webp", ".heic")):
file_name = file_name + ".jpg"
elif file_name.lower().endswith(".webm"):
file_name = file_name + ".mp4"
return file_name
def get_type(url: str | None = "", path: str | None = "") -> MediaType | None:
if url:
media = get_filename_from_url(url)
else:
media = path
name, ext = splitext(media)
if ext in MediaExts.PHOTO:
return MediaType.PHOTO
if ext in MediaExts.VIDEO:
return MediaType.VIDEO
if ext in MediaExts.GIF:
return MediaType.GIF
if ext in MediaExts.AUDIO:
return MediaType.AUDIO
return MediaType.DOCUMENT
def get_tg_media_details(message: Message):
match message.media:
case MessageMediaType.PHOTO:
file = message.photo
file.file_name = "photo.jpg"
return file
case MessageMediaType.AUDIO:
return message.audio
case MessageMediaType.ANIMATION:
return message.animation
case MessageMediaType.DOCUMENT:
return message.document
case MessageMediaType.STICKER:
return message.sticker
case MessageMediaType.VIDEO:
return message.video
case MessageMediaType.VOICE:
return message.voice
case _:
return

View File

@@ -1,74 +0,0 @@
import asyncio
import os
async def run_shell_cmd(cmd: str) -> str:
proc: asyncio.create_subprocess_shell = await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
stdout, _ = await proc.communicate()
return stdout.decode("utf-8")
async def take_ss(video: str, path: str) -> None | str:
thumb = f"{path}/i.png"
await run_shell_cmd(
f'''ffmpeg -hide_banner -loglevel error -ss 0.1 -i "{video}" -vframes 1 "{thumb}"'''
)
if os.path.isfile(thumb):
return thumb
async def check_audio(file) -> int:
result = await run_shell_cmd(
f'''ffprobe -v error -show_entries format=nb_streams -of default=noprint_wrappers=1:nokey=1 "{file}"'''
)
return int(result or 0) - 1
async def get_duration(file) -> int:
duration = await run_shell_cmd(
f'''ffprobe -v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 "{file}"'''
)
return round(float(duration.strip() or 0))
class AsyncShell:
def __init__(
self,
process: asyncio.create_subprocess_shell,
):
self.process: asyncio.create_subprocess_shell = process
self.full_std: str = ""
self.last_line: str = ""
self.is_done: bool = False
self._task: asyncio.Task | None = None
async def read_output(self) -> None:
async for line in self.process.stdout:
decoded_line = line.decode("utf-8")
self.full_std += decoded_line
self.last_line = decoded_line
self.is_done = True
await self.process.wait()
async def get_output(self):
while not self.is_done:
yield self.full_std if len(self.full_std) < 4000 else self.last_line
await asyncio.sleep(0)
def cancel(self) -> None:
if not self.is_done:
self.process.kill()
self._task.cancel()
@classmethod
async def run_cmd(cls, cmd: str, name: str = "AsyncShell") -> "AsyncShell":
sub_process: AsyncShell = cls(
process=await asyncio.create_subprocess_shell(
cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
)
sub_process._task = asyncio.create_task(sub_process.read_output(), name=name)
await asyncio.sleep(0.5)
return sub_process

17
req.txt
View File

@@ -1,16 +1,7 @@
aiohttp==3.8.5
aiofiles==23.2.1
async-lru==2.0.4
httpx==0.25.0
dnspython==2.3.0
motor==3.2.0
pymongo==4.4.0
gitpython>=3.1.32
pyrogram==2.0.106
python-dotenv==0.21.0
tgcrypto
telegraph==2.2.0
uvloop==0.17.0
# BoilerPlate Code for UB
git+https://github.com/thedragonsinn/ub-core.git
yt-dlp
pillow
google-generativeai