# Moon-Userbot - telegram userbot # Copyright (C) 2020-present Moon Userbot Organization # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import base64 from io import BytesIO import requests from pyrogram import Client, filters, errors, types from pyrogram.types import Message from utils.misc import modules_help, prefix from utils.scripts import with_reply, format_exc, resize_image QUOTES_API = "https://quotes-o042.onrender.com/generate" @Client.on_message(filters.command(["q", "quote"], prefix) & filters.me) @with_reply async def quote_cmd(client: Client, message: Message): if len(message.command) > 1 and message.command[1].isdigit(): count = int(message.command[1]) if count < 1: count = 1 elif count > 15: count = 15 else: count = 1 is_png = "!png" in message.command or "!file" in message.command send_for_me = "!me" in message.command or "!ls" in message.command no_reply = "!noreply" in message.command or "!nr" in message.command messages = [] async for msg in client.get_chat_history( message.chat.id, offset_id=message.reply_to_message.id + count, limit=count, ): if msg.empty: continue if msg.id >= message.id: break if no_reply: msg.reply_to_message = None messages.append(msg) if len(messages) >= count: break messages.reverse() if send_for_me: await message.delete() message = await client.send_message("me", "Generating...") else: await message.edit("Generating...") params = { "messages": [ await render_message(client, msg) for msg in messages if not msg.empty ], "quote_color": "#162330", "text_color": "#fff", } response = requests.post(QUOTES_API, json=params) if not response.ok: return await message.edit( f"Quotes API error!\n{response.text}" ) resized = resize_image( BytesIO(response.content), img_type="PNG" if is_png else "WEBP" ) await message.edit("Sending...") try: func = client.send_document if is_png else client.send_sticker chat_id = "me" if send_for_me else message.chat.id await func(chat_id, resized) except errors.RPCError as e: # no rights to send stickers, etc await message.edit(format_exc(e)) else: await message.delete() @Client.on_message(filters.command(["fq", "fakequote"], prefix) & filters.me) @with_reply async def fake_quote_cmd(client: Client, message: types.Message): is_png = "!png" in message.command or "!file" in message.command send_for_me = "!me" in message.command or "!ls" in message.command no_reply = "!noreply" in message.command or "!nr" in message.command fake_quote_text = " ".join( [ arg for arg in message.command[1:] if arg not in ["!png", "!file", "!me", "!ls", "!noreply", "!nr"] ] # remove some special arg words ) if not fake_quote_text: return await message.edit("Fake quote text is empty") q_message = await client.get_messages(message.chat.id, message.reply_to_message.id) q_message.text = fake_quote_text q_message.entities = None if no_reply: q_message.reply_to_message = None if send_for_me: await message.delete() message = await client.send_message("me", "Generating...") else: await message.edit("Generating...") params = { "messages": [await render_message(client, q_message)], "quote_color": "#162330", "text_color": "#fff", } response = requests.post(QUOTES_API, json=params) if not response.ok: return await message.edit( f"Quotes API error!\n{response.text}" ) resized = resize_image( BytesIO(response.content), img_type="PNG" if is_png else "WEBP" ) await message.edit("Sending...") try: func = client.send_document if is_png else client.send_sticker chat_id = "me" if send_for_me else message.chat.id await func(chat_id, resized) except errors.RPCError as e: # no rights to send stickers, etc await message.edit(format_exc(e)) else: await message.delete() files_cache = {} async def render_message(app: Client, message: types.Message) -> dict: async def get_file(file_id) -> str: if file_id in files_cache: return files_cache[file_id] content = await app.download_media(file_id, in_memory=True) data = base64.b64encode(bytes(content.getbuffer())).decode() files_cache[file_id] = data return data # text if message.photo: text = message.caption if message.caption else "" elif message.poll: text = get_poll_text(message.poll) elif message.sticker: text = "" else: text = get_reply_text(message) # media if message.photo: media = await get_file(message.photo.file_id) elif message.sticker: media = await get_file(message.sticker.file_id) else: media = "" # entities entities = [] if message.entities: for entity in message.entities: entities.append( { "offset": entity.offset, "length": entity.length, "type": str(entity.type).split(".")[-1].lower(), } ) def move_forwards(msg: types.Message): if msg.forward_origin: if isinstance(msg.forward_origin, types.MessageOriginUser): msg.from_user = msg.forward_origin.sender_user elif isinstance(msg.forward_origin, types.MessageOriginHiddenUser): msg.from_user.id = 0 msg.from_user.first_name = msg.forward_origin.sender_user_name msg.from_user.last_name = "" elif isinstance(msg.forward_origin, types.MessageOriginChat): msg.sender_chat = msg.forward_origin.sender_chat msg.from_user.id = 0 if msg.forward_origin.author_signature: msg.author_signature = msg.forward_origin.author_signature move_forwards(message) # author author = {} if message.from_user and message.from_user.id != 0: from_user = message.from_user author["id"] = from_user.id author["name"] = get_full_name(from_user) if message.author_signature: author["rank"] = message.author_signature elif message.chat.type != "supergroup" or message.forward_date: author["rank"] = "" else: try: member = await message.chat.get_member(from_user.id) except errors.UserNotParticipant: author["rank"] = "" else: author["rank"] = getattr(member, "title", "") or ( "owner" if member.status == "creator" else "admin" if member.status == "administrator" else "" ) if from_user.photo: author["avatar"] = await get_file(from_user.photo.big_file_id) elif not from_user.photo and from_user.username: # may be user blocked us, we will try to get avatar via t.me t_me_page = requests.get(f"https://t.me/{from_user.username}").text sub = ' 0 and link[0] and link[0] != "https://telegram.org/img/t_logo.png" ): # found valid link avatar = requests.get(link[0]).content author["avatar"] = base64.b64encode(avatar).decode() else: author["avatar"] = "" else: author["avatar"] = "" else: author["avatar"] = "" elif message.from_user and message.from_user.id == 0: author["id"] = 0 author["name"] = message.from_user.first_name author["rank"] = "" else: author["id"] = message.sender_chat.id author["name"] = message.sender_chat.title author["rank"] = "channel" if message.sender_chat.type == "channel" else "" if message.sender_chat.photo: author["avatar"] = await get_file(message.sender_chat.photo.big_file_id) else: author["avatar"] = "" author["via_bot"] = message.via_bot.username if message.via_bot else "" # reply reply = {} reply_msg = message.reply_to_message if reply_msg and not reply_msg.empty: move_forwards(reply_msg) if reply_msg.from_user: reply["id"] = reply_msg.from_user.id reply["name"] = get_full_name(reply_msg.from_user) else: reply["id"] = reply_msg.sender_chat.id reply["name"] = reply_msg.sender_chat.title reply["text"] = get_reply_text(reply_msg) return { "text": text, "media": media, "entities": entities, "author": author, "reply": reply, } def get_audio_text(audio: types.Audio) -> str: if audio.title and audio.performer: return f" ({audio.title} — {audio.performer})" if audio.title: return f" ({audio.title})" if audio.performer: return f" ({audio.performer})" return "" def get_reply_text(reply: types.Message) -> str: return ( "📷 Photo" + ("\n" + reply.caption if reply.caption else "") if reply.photo else ( get_reply_poll_text(reply.poll) if reply.poll else ( "📍 Location" if reply.location or reply.venue else ( "👤 Contact" if reply.contact else ( "🖼 GIF" if reply.animation else ( "🎧 Music" + get_audio_text(reply.audio) if reply.audio else ( "📹 Video" if reply.video else ( "📹 Videomessage" if reply.video_note else ( "🎵 Voice" if reply.voice else ( ( reply.sticker.emoji + " " if reply.sticker.emoji else "" ) + "Sticker" if reply.sticker else ( "💾 File " + reply.document.file_name if reply.document else ( "🎮 Game" if reply.game else ( "🎮 set new record" if reply.game_high_score else ( f"{reply.dice.emoji} - {reply.dice.value}" if reply.dice else ( ( "👤 joined the group" if reply.new_chat_members[ 0 ].id == reply.from_user.id else f"👤 invited {get_full_name(reply.new_chat_members[0])} to the group" ) if reply.new_chat_members else ( ( "👤 left the group" if reply.left_chat_member.id == reply.from_user.id else f"👤 removed {get_full_name(reply.left_chat_member)}" ) if reply.left_chat_member else ( f"✏ changed group name to {reply.new_chat_title}" if reply.new_chat_title else ( "🖼 changed group photo" if reply.new_chat_photo else ( "🖼 removed group photo" if reply.delete_chat_photo else ( "📍 pinned message" if reply.pinned_message else ( "🎤 started a new video chat" if reply.video_chat_started else ( "🎤 ended the video chat" if reply.video_chat_ended else ( "🎤 invited participants to the video chat" if reply.video_chat_members_invited else ( "👥 created the group" if reply.group_chat_created or reply.supergroup_chat_created else ( "👥 created the channel" if reply.channel_chat_created else reply.text or "unsupported message" ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) ) def get_poll_text(poll: types.Poll) -> str: text = get_reply_poll_text(poll) + "\n" text += poll.question + "\n" for option in poll.options: text += f"- {option.text}" if option.voter_count > 0: text += f" ({option.voter_count} voted)" text += "\n" text += f"Total: {poll.total_voter_count} voted" return text def get_reply_poll_text(poll: types.Poll) -> str: if poll.is_anonymous: text = "📊 Anonymous poll" if poll.type == "regular" else "📊 Anonymous quiz" else: text = "📊 Poll" if poll.type == "regular" else "📊 Quiz" if poll.is_closed: text += " (closed)" return text def get_full_name(user: types.User) -> str: name = user.first_name if user.last_name: name += " " + user.last_name return name modules_help["squotes"] = { "q [reply]* [count 1-15] [!png] [!me] [!noreply]": "Generate a quote\n" "Available options: !png — send as PNG, !me — send quote to" "saved messages, !noreply — generate quote without reply", "fq [reply]* [!png] [!me] [!noreply] [text]*": "Generate a fake quote", }