Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
454 lines
14 KiB
Python
454 lines
14 KiB
Python
# Ultroid - UserBot
|
|
# Copyright (C) 2021-2025 TeamUltroid
|
|
#
|
|
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
|
|
# PLease read the GNU Affero General Public License in
|
|
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>.
|
|
|
|
import base64
|
|
import os
|
|
import random
|
|
import re
|
|
import string
|
|
from logging import WARNING
|
|
from random import choice, randrange, shuffle
|
|
from traceback import format_exc
|
|
from catbox import CatboxUploader
|
|
|
|
from pyUltroid.exceptions import DependencyMissingError
|
|
|
|
try:
|
|
from aiohttp import ContentTypeError
|
|
except ImportError:
|
|
ContentTypeError = None
|
|
|
|
from telethon.tl import types
|
|
from telethon.utils import get_display_name, get_peer_id
|
|
|
|
from .. import *
|
|
from .._misc._wrappers import eor
|
|
|
|
if run_as_module:
|
|
from ..dB import DEVLIST
|
|
from ..dB._core import LIST
|
|
|
|
from . import some_random_headers
|
|
from .helper import async_searcher
|
|
from .tools import check_filename, json_parser
|
|
|
|
try:
|
|
import aiohttp
|
|
except ImportError:
|
|
aiohttp = None
|
|
|
|
try:
|
|
from PIL import Image
|
|
except ImportError:
|
|
Image = None
|
|
|
|
try:
|
|
import cv2
|
|
except ImportError:
|
|
cv2 = None
|
|
try:
|
|
import numpy as np
|
|
except ImportError:
|
|
np = None
|
|
|
|
try:
|
|
from bs4 import BeautifulSoup
|
|
except ImportError:
|
|
BeautifulSoup = None
|
|
|
|
uploader = CatboxUploader()
|
|
|
|
|
|
async def randomchannel(
|
|
tochat, channel, range1, range2, caption=None, client=ultroid_bot
|
|
):
|
|
do = randrange(range1, range2)
|
|
async for x in client.iter_messages(channel, add_offset=do, limit=1):
|
|
caption = caption or x.text
|
|
try:
|
|
await client.send_message(tochat, caption, file=x.media)
|
|
except BaseException:
|
|
pass
|
|
|
|
|
|
# --------------------------------------------------
|
|
|
|
|
|
async def YtDataScraper(url: str):
|
|
to_return = {}
|
|
data = json_parser(
|
|
BeautifulSoup(
|
|
await async_searcher(url),
|
|
"html.parser",
|
|
)
|
|
.find_all("script")[41]
|
|
.text[20:-1]
|
|
)["contents"]
|
|
_common_data = data["twoColumnWatchNextResults"]["results"]["results"]["contents"]
|
|
common_data = _common_data[0]["videoPrimaryInfoRenderer"]
|
|
try:
|
|
description_data = _common_data[1]["videoSecondaryInfoRenderer"]["description"][
|
|
"runs"
|
|
]
|
|
except (KeyError, IndexError):
|
|
description_data = [{"text": "U hurrr from here"}]
|
|
description = "".join(
|
|
description_datum["text"] for description_datum in description_data
|
|
)
|
|
to_return["title"] = common_data["title"]["runs"][0]["text"]
|
|
to_return["views"] = (
|
|
common_data["viewCount"]["videoViewCountRenderer"]["shortViewCount"][
|
|
"simpleText"
|
|
]
|
|
or common_data["viewCount"]["videoViewCountRenderer"]["viewCount"]["simpleText"]
|
|
)
|
|
to_return["publish_date"] = common_data["dateText"]["simpleText"]
|
|
to_return["likes"] = (
|
|
common_data["videoActions"]["menuRenderer"]["topLevelButtons"][0][
|
|
"toggleButtonRenderer"
|
|
]["defaultText"]["simpleText"]
|
|
# or like_dislike[0]["toggleButtonRenderer"]["defaultText"]["accessibility"][
|
|
# "accessibilityData"
|
|
# ]["label"]
|
|
)
|
|
to_return["description"] = description
|
|
return to_return
|
|
|
|
|
|
# --------------------------------------------------
|
|
|
|
|
|
async def google_search(query):
|
|
query = query.replace(" ", "+")
|
|
_base = "https://google.com"
|
|
headers = {
|
|
"Cache-Control": "no-cache",
|
|
"Connection": "keep-alive",
|
|
"User-Agent": choice(some_random_headers),
|
|
}
|
|
con = await async_searcher(_base + "/search?q=" + query, headers=headers)
|
|
soup = BeautifulSoup(con, "html.parser")
|
|
result = []
|
|
pdata = soup.find_all("a", href=re.compile("url="))
|
|
for data in pdata:
|
|
if not data.find("div"):
|
|
continue
|
|
try:
|
|
result.append(
|
|
{
|
|
"title": data.find("div").text,
|
|
"link": data["href"].split("&url=")[1].split("&ved=")[0],
|
|
"description": data.find_all("div")[-1].text,
|
|
}
|
|
)
|
|
except BaseException as er:
|
|
LOGS.exception(er)
|
|
return result
|
|
|
|
|
|
# ----------------------------------------------------
|
|
|
|
|
|
async def allcmds(event, telegraph):
|
|
txt = ""
|
|
for z in LIST.keys():
|
|
txt += f"PLUGIN NAME: {z}\n"
|
|
for zz in LIST[z]:
|
|
txt += HNDLR + zz + "\n"
|
|
txt += "\n\n"
|
|
t = telegraph.create_page(title="Ultroid All Cmds", content=[txt])
|
|
await eor(event, f"All Ultroid Cmds : [Click Here]({t['url']})", link_preview=False)
|
|
|
|
|
|
async def ReTrieveFile(input_file_name):
|
|
if not aiohttp:
|
|
raise DependencyMissingError("This function needs 'aiohttp' to be installed.")
|
|
RMBG_API = udB.get_key("RMBG_API")
|
|
headers = {"X-API-Key": RMBG_API}
|
|
files = {"image_file": open(input_file_name, "rb").read()}
|
|
async with aiohttp.ClientSession() as ses:
|
|
async with ses.post(
|
|
"https://api.remove.bg/v1.0/removebg", headers=headers, data=files
|
|
) as out:
|
|
contentType = out.headers.get("content-type")
|
|
if "image" not in contentType:
|
|
return False, (await out.json())
|
|
|
|
name = check_filename("ult-rmbg.png")
|
|
with open(name, "wb") as file:
|
|
file.write(await out.read())
|
|
return True, name
|
|
|
|
|
|
# ---------------- Unsplash Search ----------------
|
|
# @New-Dev0
|
|
|
|
|
|
async def unsplashsearch(query, limit=None, shuf=True):
|
|
query = query.replace(" ", "-")
|
|
link = "https://unsplash.com/s/photos/" + query
|
|
extra = await async_searcher(link, re_content=True)
|
|
res = BeautifulSoup(extra, "html.parser", from_encoding="utf-8")
|
|
all_ = res.find_all("img", srcset=re.compile("images.unsplash.com/photo"))
|
|
if shuf:
|
|
shuffle(all_)
|
|
return list(map(lambda e: e["src"], all_[:limit]))
|
|
|
|
|
|
# ---------------- Random User Gen ----------------
|
|
# @xditya
|
|
|
|
|
|
async def get_random_user_data():
|
|
base_url = "https://randomuser.me/api/"
|
|
cc = await async_searcher(
|
|
"https://random-data-api.com/api/business_credit_card/random_card", re_json=True
|
|
)
|
|
card = (
|
|
"**CARD_ID:** "
|
|
+ str(cc["credit_card_number"])
|
|
+ f" {cc['credit_card_expiry_date']}\n"
|
|
+ f"**C-ID :** {cc['id']}"
|
|
)
|
|
data_ = (await async_searcher(base_url, re_json=True))["results"][0]
|
|
_g = data_["gender"]
|
|
gender = "🤵🏻♂" if _g == "male" else "🤵🏻♀"
|
|
name = data_["name"]
|
|
loc = data_["location"]
|
|
dob = data_["dob"]
|
|
msg = """
|
|
{} **Name:** {}.{} {}
|
|
**Street:** {} {}
|
|
**City:** {}
|
|
**State:** {}
|
|
**Country:** {}
|
|
**Postal Code:** {}
|
|
**Email:** {}
|
|
**Phone:** {}
|
|
**Card:** {}
|
|
**Birthday:** {}
|
|
""".format(
|
|
gender,
|
|
name["title"],
|
|
name["first"],
|
|
name["last"],
|
|
loc["street"]["number"],
|
|
loc["street"]["name"],
|
|
loc["city"],
|
|
loc["state"],
|
|
loc["country"],
|
|
loc["postcode"],
|
|
data_["email"],
|
|
data_["phone"],
|
|
card,
|
|
dob["date"][:10],
|
|
)
|
|
pic = data_["picture"]["large"]
|
|
return msg, pic
|
|
|
|
|
|
# Dictionary (Synonyms and Antonyms)
|
|
|
|
|
|
async def get_synonyms_or_antonyms(word, type_of_words):
|
|
if type_of_words not in ["synonyms", "antonyms"]:
|
|
return "Dude! Please give a corrent type of words you want."
|
|
s = await async_searcher(
|
|
f"https://tuna.thesaurus.com/pageData/{word}", re_json=True
|
|
)
|
|
li_1 = [
|
|
y
|
|
for x in [
|
|
s["data"]["definitionData"]["definitions"][0][type_of_words],
|
|
s["data"]["definitionData"]["definitions"][1][type_of_words],
|
|
]
|
|
for y in x
|
|
]
|
|
return [y["term"] for y in li_1]
|
|
|
|
|
|
# Quotly
|
|
|
|
|
|
class Quotly:
|
|
_API = "https://quoteampi.onrender.com/generate"
|
|
_entities = {
|
|
types.MessageEntityPhone: "phone_number",
|
|
types.MessageEntityMention: "mention",
|
|
types.MessageEntityBold: "bold",
|
|
types.MessageEntityCashtag: "cashtag",
|
|
types.MessageEntityStrike: "strikethrough",
|
|
types.MessageEntityHashtag: "hashtag",
|
|
types.MessageEntityEmail: "email",
|
|
types.MessageEntityMentionName: "text_mention",
|
|
types.MessageEntityUnderline: "underline",
|
|
types.MessageEntityUrl: "url",
|
|
types.MessageEntityTextUrl: "text_link",
|
|
types.MessageEntityBotCommand: "bot_command",
|
|
types.MessageEntityCode: "code",
|
|
types.MessageEntityPre: "pre",
|
|
types.MessageEntitySpoiler: "spoiler",
|
|
}
|
|
|
|
async def _format_quote(self, event, reply=None, sender=None, type_="private"):
|
|
async def telegraph(file_):
|
|
file = file_ + ".png"
|
|
Image.open(file_).save(file, "PNG")
|
|
uri = uploader.upload_file(file)
|
|
os.remove(file)
|
|
os.remove(file_)
|
|
return uri
|
|
|
|
if reply and reply.raw_text:
|
|
reply = {
|
|
"name": get_display_name(reply.sender) or "Deleted Account",
|
|
"text": reply.raw_text,
|
|
"chatId": reply.chat_id,
|
|
}
|
|
else:
|
|
reply = {}
|
|
is_fwd = event.fwd_from
|
|
name = None
|
|
last_name = None
|
|
if sender and sender.id not in DEVLIST:
|
|
id_ = get_peer_id(sender)
|
|
elif not is_fwd:
|
|
id_ = event.sender_id
|
|
sender = await event.get_sender()
|
|
else:
|
|
id_, sender = None, None
|
|
name = is_fwd.from_name
|
|
if is_fwd.from_id:
|
|
id_ = get_peer_id(is_fwd.from_id)
|
|
try:
|
|
sender = await event.client.get_entity(id_)
|
|
except ValueError:
|
|
pass
|
|
if sender:
|
|
name = get_display_name(sender)
|
|
if hasattr(sender, "last_name"):
|
|
last_name = sender.last_name
|
|
entities = []
|
|
if event.entities:
|
|
for entity in event.entities:
|
|
if type(entity) in self._entities:
|
|
enti_ = entity.to_dict()
|
|
del enti_["_"]
|
|
enti_["type"] = self._entities[type(entity)]
|
|
entities.append(enti_)
|
|
text = event.raw_text
|
|
if isinstance(event, types.MessageService):
|
|
if isinstance(event.action, types.MessageActionGameScore):
|
|
text = f"scored {event.action.score}"
|
|
rep = await event.get_reply_message()
|
|
if rep and rep.game:
|
|
text += f" in {rep.game.title}"
|
|
elif isinstance(event.action, types.MessageActionPinMessage):
|
|
text = "pinned a message."
|
|
# TODO: Are there any more events with sender?
|
|
message = {
|
|
"entities": entities,
|
|
"chatId": id_,
|
|
"avatar": True,
|
|
"from": {
|
|
"id": id_,
|
|
"first_name": (name or (sender.first_name if sender else None))
|
|
or "Deleted Account",
|
|
"last_name": last_name,
|
|
"username": sender.username if sender else None,
|
|
"language_code": "en",
|
|
"title": name,
|
|
"name": name or "Deleted Account",
|
|
"type": type_,
|
|
},
|
|
"text": text,
|
|
"replyMessage": reply,
|
|
}
|
|
if event.document and event.document.thumbs:
|
|
file_ = await event.download_media(thumb=-1)
|
|
uri = await telegraph(file_)
|
|
message["media"] = {"url": uri}
|
|
|
|
return message
|
|
|
|
async def create_quotly(
|
|
self,
|
|
event,
|
|
url="https://bot.lyo.su/quote/generate",
|
|
reply={},
|
|
bg=None,
|
|
sender=None,
|
|
file_name="quote.webp",
|
|
):
|
|
"""Create quotely's quote."""
|
|
if not isinstance(event, list):
|
|
event = [event]
|
|
from .. import udB
|
|
|
|
if udB.get_key("OQAPI"):
|
|
url = Quotly._API
|
|
if not bg:
|
|
bg = "#1b1429"
|
|
content = {
|
|
"type": "quote",
|
|
"format": "webp",
|
|
"backgroundColor": bg,
|
|
"width": 512,
|
|
"height": 768,
|
|
"scale": 2,
|
|
"messages": [
|
|
await self._format_quote(message, reply=reply, sender=sender)
|
|
for message in event
|
|
],
|
|
}
|
|
try:
|
|
request = await async_searcher(url, post=True, json=content, re_json=True)
|
|
except ContentTypeError as er:
|
|
if url != self._API:
|
|
return await self.create_quotly(
|
|
event,
|
|
url=self._API,
|
|
bg=bg,
|
|
sender=sender,
|
|
reply=reply,
|
|
file_name=file_name,
|
|
)
|
|
raise er
|
|
if request.get("ok"):
|
|
with open(file_name, "wb") as file:
|
|
image = base64.decodebytes(request["result"]["image"].encode("utf-8"))
|
|
file.write(image)
|
|
return file_name
|
|
raise Exception(str(request))
|
|
|
|
|
|
def split_list(List, index):
|
|
new_ = []
|
|
while List:
|
|
new_.extend([List[:index]])
|
|
List = List[index:]
|
|
return new_
|
|
|
|
|
|
# https://stackoverflow.com/questions/9041681/opencv-python-rotate-image-by-x-degrees-around-specific-point
|
|
|
|
|
|
def rotate_image(image, angle):
|
|
if not cv2:
|
|
raise DependencyMissingError("This function needs 'cv2' to be installed!")
|
|
image_center = tuple(np.array(image.shape[1::-1]) / 2)
|
|
rot_mat = cv2.getRotationMatrix2D(image_center, angle, 1.0)
|
|
return cv2.warpAffine(image, rot_mat, image.shape[1::-1], flags=cv2.INTER_LINEAR)
|
|
|
|
|
|
def random_string(length=3):
|
|
"""Generate random string of 'n' Length"""
|
|
return "".join(random.choices(string.ascii_uppercase, k=length))
|
|
|
|
|
|
setattr(random, "random_string", random_string)
|