Update scripts.py

This commit is contained in:
Abhi
2023-12-17 22:52:17 +05:30
committed by GitHub
parent 35f04161b7
commit 7d5642d648

View File

@@ -1,5 +1,5 @@
# Dragon-Userbot - telegram userbot
# Copyright (C) 2020-present Dragon Userbot Organization
# Moon-Userbot - telegram userbot
# Copyright (C) 2020-present Moon Userbot Organization
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -14,6 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
import logging
import math
import time
import os
import re
import sys
@@ -26,13 +29,116 @@ from types import ModuleType
from typing import Dict
from PIL import Image
from pyrogram.errors import FloodWait, MessageNotModified
from pyrogram import Client, errors, types, enums
from pyrogram.types import Message
from .misc import modules_help, prefix, requirements_list
META_COMMENTS = re.compile(r"^ *# *meta +(\S+) *: *(.*?)\s*$", re.MULTILINE)
interact_with_to_delete = []
def time_formatter(milliseconds: int) -> str:
"""Time Formatter"""
seconds, milliseconds = divmod(int(milliseconds), 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
tmp = (
((str(days) + " day(s), ") if days else "")
+ ((str(hours) + " hour(s), ") if hours else "")
+ ((str(minutes) + " minute(s), ") if minutes else "")
+ ((str(seconds) + " second(s), ") if seconds else "")
+ ((str(milliseconds) + " millisecond(s), ") if milliseconds else "")
)
return tmp[:-2]
def humanbytes(size):
"""Convert Bytes To Bytes So That Human Can Read It"""
if not size:
return ""
power = 2 ** 10
raised_to_pow = 0
dict_power_n = {0: "", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
raised_to_pow += 1
return str(round(size, 2)) + " " + dict_power_n[raised_to_pow] + "B"
async def edit_or_send_as_file(
text: str,
message: Message,
client: Client,
caption: str = "`Result!`",
file_name: str = "result",
parse_mode=enums.ParseMode.MARKDOWN,
):
"""Send As File If Len Of Text Exceeds Tg Limit Else Edit Message"""
if not text:
await message.edit("`Wait, What?`", parse_mode=enums.ParseMode.MARKDOWN)
return
if len(text) > 1024:
await message.edit("`OutPut is Too Large, Sending As File!`", parse_mode=enums.ParseMode.MARKDOWN)
file_names = f"{file_name}.text"
open(file_names, "w").write(text)
await client.send_document(message.chat.id, file_names, caption=caption)
await message.delete()
if os.path.exists(file_names):
os.remove(file_names)
return
else:
return await message.edit(text, parse_mode=parse_mode)
def get_text(message: Message) -> [None, str]:
"""Extract Text From Commands"""
text_to_return = message.text
if message.text is None:
return None
if " " in text_to_return:
try:
return message.text.split(None, 1)[1]
except IndexError:
return None
else:
return None
async def progress(current, total, message, start, type_of_ps, file_name=None):
"""Progress Bar For Showing Progress While Uploading / Downloading File - Normal"""
now = time.time()
diff = now - start
if round(diff % 10.00) == 0 or current == total:
percentage = current * 100 / total
speed = current / diff
elapsed_time = round(diff) * 1000
if elapsed_time == 0:
return
time_to_completion = round((total - current) / speed) * 1000
estimated_total_time = elapsed_time + time_to_completion
progress_str = "{0}{1} {2}%\n".format(
"".join(["" for i in range(math.floor(percentage / 10))]),
"".join(["" for i in range(10 - math.floor(percentage / 10))]),
round(percentage, 2),
)
tmp = progress_str + "{0} of {1}\nETA: {2}".format(
humanbytes(current), humanbytes(total), time_formatter(estimated_total_time)
)
if file_name:
try:
await message.edit(
"{}\n**File Name:** `{}`\n{}".format(type_of_ps, file_name, tmp, parse_mode=enums.ParseMode.MARKDOWN)
)
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
pass
else:
try:
await message.edit("{}\n{}".format(type_of_ps, tmp), parse_mode=enums.ParseMode.MARKDOWN)
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
pass
async def edit_or_reply(message, text, parse_mode=enums.ParseMode.MARKDOWN):
"""Edit Message If Its From Self, Else Reply To Message"""
if not message: