From 165f7d222bfa14ff8b3fca65bd59ceb339a7cac5 Mon Sep 17 00:00:00 2001 From: allinoneallinone00 Date: Wed, 1 May 2024 22:26:50 +0000 Subject: [PATCH] Use of `f-strings` --- modules/removebg.py | 29 ++++++++----------------- modules/sessionkiller.py | 47 +++++++--------------------------------- modules/url.py | 10 ++++----- utils/scripts.py | 38 +++++++++++++++++++------------- 4 files changed, 45 insertions(+), 79 deletions(-) diff --git a/modules/removebg.py b/modules/removebg.py index 65a41c1..08533e9 100644 --- a/modules/removebg.py +++ b/modules/removebg.py @@ -71,7 +71,7 @@ def remove_background(photo_data): data={"size": "auto"}, headers={"X-Api-Key": rmbg_key}, ) - if response.status_code == requests.codes.ok: + if response.status_code == 200: return BytesIO(response.content) print("Error:", response.status_code, response.text) return None @@ -83,9 +83,8 @@ def _check_rmbg(func): if not rmbg_key: await edit_or_reply( message, - "`Is Your RMBG Api 'rmbg_key' Valid Or You Didn't Add It??`", - parse_mode=enums.ParseMode.MARKDOWN, - ) + "Is Your RMBG Api 'rmbg_key' Valid Or You Didn't Add It??" + ) else: await func(client, message) @@ -95,19 +94,13 @@ def _check_rmbg(func): @Client.on_message(filters.command("rmbg", prefix) & filters.me) @_check_rmbg async def rmbg(client: Client, message: Message): - pablo = await edit_or_reply( - message, "`Processing...`", parse_mode=enums.ParseMode.MARKDOWN - ) + pablo = await edit_or_reply(message, "Processing...") if not message.reply_to_message: - await pablo.edit( - "`Reply To A Image Please!`", parse_mode=enums.ParseMode.MARKDOWN - ) + await pablo.edit("Reply To A Image Please!") return cool = await convert_to_image(message, client) if not cool: - await pablo.edit( - "`Reply to a valid media first.`", parse_mode=enums.ParseMode.MARKDOWN - ) + await pablo.edit("Reply to a valid media first.") return start = datetime.now() await pablo.edit("sending to ReMove.BG") @@ -135,18 +128,14 @@ async def rmbg(client: Client, message: Message): end = datetime.now() ms = (end - start).seconds await pablo.edit( - "Removed image's Background in {} seconds, powered by @moonuserbot".format( - ms + f"Removed image's Background in {ms} seconds, powered by @moonuserbot" ) - ) if os.path.exists("BG_rem.png"): os.remove("BG_rem.png") else: await pablo.edit( - "ReMove.BG API returned Errors. Please report to @moonub_chat\n`{}".format( - output_file_name.content.decode("UTF-8") - ) - ) + "ReMove.BG API returned Errors. Please report to @moonub_chat" + + f"\n`{output_file_name.content.decode('UTF-8')}") @Client.on_message(filters.command("rebg", prefix) & filters.me) diff --git a/modules/sessionkiller.py b/modules/sessionkiller.py index 184a622..9cbeb3f 100644 --- a/modules/sessionkiller.py +++ b/modules/sessionkiller.py @@ -39,46 +39,15 @@ async def sessions_list(client: Client, message: Message): sessions = (await client.invoke(GetAuthorizations())).authorizations for num, session in enumerate(sessions, 1): formatted_sessions.append( - ( - "{num}. {model} on {platform}\n" - "Hash: {hash}\n" - "App name: {app_name} v.{version}\n" - "Created (last activity): {created} ({last_activity})\n" - "IP and location: : {ip} ({location})\n" - "Official status: {official}\n" - "2FA accepted: {password_pending}\n" - "Can accept calls / secret chats: {calls} / {secret_chats}" - ).format( - num=num, - model=escape(session.device_model), - platform=escape( - session.platform - if session.platform != "" - else "unknown platform" - ), - hash=session.hash, - app_name=escape(session.app_name), - version=escape( - session.app_version - if session.app_version != "" - else "unknown" - ), - created=datetime.fromtimestamp( - session.date_created - ).isoformat(), - last_activity=datetime.fromtimestamp( - session.date_active - ).isoformat(), - ip=session.ip, - location=session.country, - official="✅" if session.official_app else "❌️", - password_pending="❌️️" if session.password_pending else "✅", - calls="❌️️" if session.call_requests_disabled else "✅", - secret_chats="❌️️" - if session.encrypted_requests_disabled - else "✅", + f"{num}. {escape(session.device_model)} on {escape(session.platform if session.platform!= '' else 'unknown platform')}\n" + f"Hash: {escape(session.hash)}\n" + f"App name: {escape(session.app_name)} v.{escape(session.app_version if session.app_version!= '' else 'unknown')}\n" + f"Created (last activity): {datetime.fromtimestamp(session.date_created).isoformat()} ({datetime.fromtimestamp(session.date_active).isoformat()})\n" + f"IP and location: : {session.ip} ({session.country})\n" + f"Official status: {'✅' if session.official_app else '❌️'}\n" + f"2FA accepted: {'❌️️' if session.password_pending else '✅'}\n" + f"Can accept calls / secret chats: {'❌️️' if session.call_requests_disabled else '✅'} / {'❌️️' if session.encrypted_requests_disabled else '✅'}" ) - ) answer = "Active sessions at your account:\n\n" chunk = [] for s in formatted_sessions: diff --git a/modules/url.py b/modules/url.py index 5ff6101..e79166e 100644 --- a/modules/url.py +++ b/modules/url.py @@ -121,11 +121,11 @@ async def urldl(client: Client, message: Message): diff = now - c_time percentage = downloader.get_progress() * 100 speed = downloader.get_speed(human=True) - progress_str = "{0}{1}\nProgress: {2}%".format( - "".join(["▰" for _ in range(math.floor(percentage / 5))]), - "".join(["▱" for _ in range(20 - math.floor(percentage / 5))]), - round(percentage, 2), - ) + progress_str = ( + "".join(["▰" for _ in range(math.floor(percentage / 5))]) + + "".join(["▱" for _ in range(20 - math.floor(percentage / 5))]) + + f"\nProgress: {round(percentage, 2)}%" + ) eta = downloader.get_eta(human=True) try: m = "Trying to download...\n" diff --git a/utils/scripts.py b/utils/scripts.py index 847bbaf..00f2707 100644 --- a/utils/scripts.py +++ b/utils/scripts.py @@ -118,14 +118,19 @@ async def progress(current, total, message, start, type_of_ps, file_name=None): return time_to_completion = round((total - current) / speed) * 1000 estimated_total_time = elapsed_time + time_to_completion - progress_str = f"{''.join(['▰' for i in range(math.floor(percentage / 10))])}{''.join(['▱' for i in range(10 - math.floor(percentage / 10))])} {round(percentage, 2)}%\n" - tmp = f"{progress_str}{humanbytes(current)} of {humanbytes(total)}\nETA: {time_formatter(estimated_total_time)}" + progress_str = f"{''.join(['▰' for i in range(math.floor(percentage / 10))])}" + progress_str += ( + f"{''.join(['▱' for i in range(10 - math.floor(percentage / 10))])}" + ) + progress_str += f"{round(percentage, 2)}%\n" + tmp = f"{progress_str}{humanbytes(current)} of {humanbytes(total)}\n" + tmp += f"ETA: {time_formatter(estimated_total_time)}" if file_name: try: await message.edit( f"{type_of_ps}\n**File Name:** `{file_name}`\n{tmp}", parse_mode=enums.ParseMode.MARKDOWN, - ) + ) except FloodWait as e: await asyncio.sleep(e.x) except MessageNotModified: @@ -134,7 +139,7 @@ async def progress(current, total, message, start, type_of_ps, file_name=None): try: await message.edit( f"{type_of_ps}\n{tmp}", parse_mode=enums.ParseMode.MARKDOWN - ) + ) except FloodWait as e: await asyncio.sleep(e.x) except MessageNotModified: @@ -146,14 +151,14 @@ async def run_cmd(prefix: str) -> Tuple[str, str, int, int]: args = shlex.split(prefix) process = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) + ) stdout, stderr = await process.communicate() return ( stdout.decode("utf-8", "replace").strip(), stderr.decode("utf-8", "replace").strip(), process.returncode, process.pid, - ) + ) def mediainfo(media): @@ -216,7 +221,7 @@ def format_exc(e: Exception, suffix="") -> str: return ( f"Telegram API error!\n" f"[{e.CODE} {e.ID or e.NAME}] — {e.MESSAGE.format(value=e.value)}\n\n{suffix}" - ) + ) return f"Error!\n" f"{err}" @@ -245,7 +250,7 @@ async def interact_with(message: Message) -> Message: # noinspection PyProtectedMember response = [ msg async for msg in message._client.get_chat_history(message.chat.id, limit=1) - ] + ] seconds_waiting = 0 while response[0].from_user.is_self: @@ -271,7 +276,7 @@ def format_module_help(module_name: str, full=True): help_text = ( f"Help for |{module_name}|\n\nUsage:\n" if full else "Usage:\n" - ) + ) for command, desc in commands.items(): cmd = command.split(maxsplit=1) @@ -288,7 +293,7 @@ def format_small_module_help(module_name: str, full=True): f"Help for |{module_name}|\n\nCommands list:\n" if full else "Commands list:\n" - ) + ) for command, _desc in commands.items(): cmd = command.split(maxsplit=1) args = " " + cmd[1] + "" if len(cmd) > 1 else "" @@ -317,13 +322,13 @@ def import_library(library_name: str, package_name: str = None): if completed.returncode != 0: raise AssertionError( f"Failed to install library {package_name} (pip exited with code {completed.returncode})" - ) from exc + ) from exc return importlib.import_module(library_name) def resize_image( input_img, output=None, img_type="PNG", size: int = 512, size2: int = None -): + ): if output is None: output = BytesIO() output.name = f"sticker.{img_type.lower()}" @@ -350,7 +355,7 @@ async def load_module( client: Client, message: Message = None, core=False, -) -> ModuleType: + ) -> ModuleType: if module_name in modules_help and not core: await unload_module(module_name, client) @@ -388,7 +393,10 @@ async def load_module( await asyncio.wait_for(proc.wait(), timeout=120) except asyncio.TimeoutError: if message: - await message.edit("Timeout while installed requirements. Try to install them manually") + await message.edit( + "Timeout while installed requirements." + + "Try to install them manually" + ) raise TimeoutError("timeout while installing requirements") from e if proc.returncode != 0: @@ -396,7 +404,7 @@ async def load_module( await message.edit( f"Failed to install requirements (pip exited with code {proc.returncode}). " f"Check logs for futher info", - ) + ) raise RuntimeError("failed to install requirements") from e module = importlib.import_module(path)