Use of f-strings

This commit is contained in:
allinoneallinone00
2024-05-01 22:26:50 +00:00
parent bdffa21e56
commit 165f7d222b
4 changed files with 45 additions and 79 deletions

View File

@@ -71,7 +71,7 @@ def remove_background(photo_data):
data={"size": "auto"},
headers={"X-Api-Key": rmbg_key},
)
if response.status_code == requests.codes.ok:
if response.status_code == 200:
return BytesIO(response.content)
print("Error:", response.status_code, response.text)
return None
@@ -83,9 +83,8 @@ def _check_rmbg(func):
if not rmbg_key:
await edit_or_reply(
message,
"`Is Your RMBG Api 'rmbg_key' Valid Or You Didn't Add It??`",
parse_mode=enums.ParseMode.MARKDOWN,
)
"<code>Is Your RMBG Api 'rmbg_key' Valid Or You Didn't Add It??</code>"
)
else:
await func(client, message)
@@ -95,19 +94,13 @@ def _check_rmbg(func):
@Client.on_message(filters.command("rmbg", prefix) & filters.me)
@_check_rmbg
async def rmbg(client: Client, message: Message):
pablo = await edit_or_reply(
message, "`Processing...`", parse_mode=enums.ParseMode.MARKDOWN
)
pablo = await edit_or_reply(message, "<code>Processing...</code>")
if not message.reply_to_message:
await pablo.edit(
"`Reply To A Image Please!`", parse_mode=enums.ParseMode.MARKDOWN
)
await pablo.edit("<code>Reply To A Image Please!</code>")
return
cool = await convert_to_image(message, client)
if not cool:
await pablo.edit(
"`Reply to a valid media first.`", parse_mode=enums.ParseMode.MARKDOWN
)
await pablo.edit("<code>Reply to a valid media first.</code>")
return
start = datetime.now()
await pablo.edit("sending to ReMove.BG")
@@ -135,18 +128,14 @@ async def rmbg(client: Client, message: Message):
end = datetime.now()
ms = (end - start).seconds
await pablo.edit(
"<code>Removed image's Background in {} seconds, powered by </code> <b>@moonuserbot</b>".format(
ms
f"<code>Removed image's Background in {ms} seconds, powered by </code> <b>@moonuserbot</b>"
)
)
if os.path.exists("BG_rem.png"):
os.remove("BG_rem.png")
else:
await pablo.edit(
"ReMove.BG API returned Errors. Please report to @moonub_chat\n`{}".format(
output_file_name.content.decode("UTF-8")
)
)
"ReMove.BG API returned Errors. Please report to @moonub_chat"
+ f"\n`{output_file_name.content.decode('UTF-8')}")
@Client.on_message(filters.command("rebg", prefix) & filters.me)

View File

@@ -39,46 +39,15 @@ async def sessions_list(client: Client, message: Message):
sessions = (await client.invoke(GetAuthorizations())).authorizations
for num, session in enumerate(sessions, 1):
formatted_sessions.append(
(
"<b>{num}</b>. <b>{model}</b> on <code>{platform}</code>\n"
"<b>Hash:</b> {hash}\n"
"<b>App name:</b> <code>{app_name}</code> v.{version}\n"
"<b>Created (last activity):</b> {created} ({last_activity})\n"
"<b>IP and location: </b>: <code>{ip}</code> (<i>{location}</i>)\n"
"<b>Official status:</b> <code>{official}</code>\n"
"<b>2FA accepted:</b> <code>{password_pending}</code>\n"
"<b>Can accept calls / secret chats:</b> {calls} / {secret_chats}"
).format(
num=num,
model=escape(session.device_model),
platform=escape(
session.platform
if session.platform != ""
else "unknown platform"
),
hash=session.hash,
app_name=escape(session.app_name),
version=escape(
session.app_version
if session.app_version != ""
else "unknown"
),
created=datetime.fromtimestamp(
session.date_created
).isoformat(),
last_activity=datetime.fromtimestamp(
session.date_active
).isoformat(),
ip=session.ip,
location=session.country,
official="" if session.official_app else "❌️",
password_pending="❌️️" if session.password_pending else "",
calls="❌️️" if session.call_requests_disabled else "",
secret_chats="❌️️"
if session.encrypted_requests_disabled
else "",
f"<b>{num}</b>. <b>{escape(session.device_model)}</b> on <code>{escape(session.platform if session.platform!= '' else 'unknown platform')}</code>\n"
f"<b>Hash:</b> {escape(session.hash)}\n"
f"<b>App name:</b> <code>{escape(session.app_name)}</code> v.{escape(session.app_version if session.app_version!= '' else 'unknown')}</>\n"
f"<b>Created (last activity):</b> {datetime.fromtimestamp(session.date_created).isoformat()} ({datetime.fromtimestamp(session.date_active).isoformat()})\n"
f"<b>IP and location: </b>: <code>{session.ip}</code> (<i>{session.country}</i>)\n"
f"<b>Official status:</b> <code>{'' if session.official_app else '❌️'}</code>\n"
f"<b>2FA accepted:</b> <code>{'❌️️' if session.password_pending else ''}</code>\n"
f"<b>Can accept calls / secret chats:</b> {'❌️️' if session.call_requests_disabled else ''} / {'❌️️' if session.encrypted_requests_disabled else ''}"
)
)
answer = "<b>Active sessions at your account:</b>\n\n"
chunk = []
for s in formatted_sessions:

View File

@@ -121,11 +121,11 @@ async def urldl(client: Client, message: Message):
diff = now - c_time
percentage = downloader.get_progress() * 100
speed = downloader.get_speed(human=True)
progress_str = "{0}{1}\n<b>Progress:</b> {2}%".format(
"".join(["" for _ in range(math.floor(percentage / 5))]),
"".join(["" for _ in range(20 - math.floor(percentage / 5))]),
round(percentage, 2),
)
progress_str = (
"".join(["" for _ in range(math.floor(percentage / 5))])
+ "".join(["" for _ in range(20 - math.floor(percentage / 5))])
+ f"\n<b>Progress:</b> {round(percentage, 2)}%"
)
eta = downloader.get_eta(human=True)
try:
m = "<b>Trying to download...</b>\n"

View File

@@ -118,14 +118,19 @@ async def progress(current, total, message, start, type_of_ps, file_name=None):
return
time_to_completion = round((total - current) / speed) * 1000
estimated_total_time = elapsed_time + time_to_completion
progress_str = f"{''.join(['' for i in range(math.floor(percentage / 10))])}{''.join(['' for i in range(10 - math.floor(percentage / 10))])} {round(percentage, 2)}%\n"
tmp = f"{progress_str}{humanbytes(current)} of {humanbytes(total)}\nETA: {time_formatter(estimated_total_time)}"
progress_str = f"{''.join(['' for i in range(math.floor(percentage / 10))])}"
progress_str += (
f"{''.join(['' for i in range(10 - math.floor(percentage / 10))])}"
)
progress_str += f"{round(percentage, 2)}%\n"
tmp = f"{progress_str}{humanbytes(current)} of {humanbytes(total)}\n"
tmp += f"ETA: {time_formatter(estimated_total_time)}"
if file_name:
try:
await message.edit(
f"{type_of_ps}\n**File Name:** `{file_name}`\n{tmp}",
parse_mode=enums.ParseMode.MARKDOWN,
)
)
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
@@ -134,7 +139,7 @@ async def progress(current, total, message, start, type_of_ps, file_name=None):
try:
await message.edit(
f"{type_of_ps}\n{tmp}", parse_mode=enums.ParseMode.MARKDOWN
)
)
except FloodWait as e:
await asyncio.sleep(e.x)
except MessageNotModified:
@@ -146,14 +151,14 @@ async def run_cmd(prefix: str) -> Tuple[str, str, int, int]:
args = shlex.split(prefix)
process = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
)
stdout, stderr = await process.communicate()
return (
stdout.decode("utf-8", "replace").strip(),
stderr.decode("utf-8", "replace").strip(),
process.returncode,
process.pid,
)
)
def mediainfo(media):
@@ -216,7 +221,7 @@ def format_exc(e: Exception, suffix="") -> str:
return (
f"<b>Telegram API error!</b>\n"
f"<code>[{e.CODE} {e.ID or e.NAME}] — {e.MESSAGE.format(value=e.value)}</code>\n\n<b>{suffix}</b>"
)
)
return f"<b>Error!</b>\n" f"<code>{err}</code>"
@@ -245,7 +250,7 @@ async def interact_with(message: Message) -> Message:
# noinspection PyProtectedMember
response = [
msg async for msg in message._client.get_chat_history(message.chat.id, limit=1)
]
]
seconds_waiting = 0
while response[0].from_user.is_self:
@@ -271,7 +276,7 @@ def format_module_help(module_name: str, full=True):
help_text = (
f"<b>Help for |{module_name}|\n\nUsage:</b>\n" if full else "<b>Usage:</b>\n"
)
)
for command, desc in commands.items():
cmd = command.split(maxsplit=1)
@@ -288,7 +293,7 @@ def format_small_module_help(module_name: str, full=True):
f"<b>Help for |{module_name}|\n\nCommands list:\n"
if full
else "<b>Commands list:\n"
)
)
for command, _desc in commands.items():
cmd = command.split(maxsplit=1)
args = " <code>" + cmd[1] + "</code>" if len(cmd) > 1 else ""
@@ -317,13 +322,13 @@ def import_library(library_name: str, package_name: str = None):
if completed.returncode != 0:
raise AssertionError(
f"Failed to install library {package_name} (pip exited with code {completed.returncode})"
) from exc
) from exc
return importlib.import_module(library_name)
def resize_image(
input_img, output=None, img_type="PNG", size: int = 512, size2: int = None
):
):
if output is None:
output = BytesIO()
output.name = f"sticker.{img_type.lower()}"
@@ -350,7 +355,7 @@ async def load_module(
client: Client,
message: Message = None,
core=False,
) -> ModuleType:
) -> ModuleType:
if module_name in modules_help and not core:
await unload_module(module_name, client)
@@ -388,7 +393,10 @@ async def load_module(
await asyncio.wait_for(proc.wait(), timeout=120)
except asyncio.TimeoutError:
if message:
await message.edit("<b>Timeout while installed requirements. Try to install them manually</b>")
await message.edit(
"<b>Timeout while installed requirements."
+ "Try to install them manually</b>"
)
raise TimeoutError("timeout while installing requirements") from e
if proc.returncode != 0:
@@ -396,7 +404,7 @@ async def load_module(
await message.edit(
f"<b>Failed to install requirements (pip exited with code {proc.returncode}). "
f"Check logs for futher info</b>",
)
)
raise RuntimeError("failed to install requirements") from e
module = importlib.import_module(path)