Files
Ultroid-fork/assistant/inlinestuff.py
Devesh Pal 0df53caf4c Ultroid v0.3 Updates
Co-authored-by: Aditya <xditya@ultroid.tech>
Co-authored-by: Danish <danish@ultroid.tech>
Co-authored-by: Amit Sharma <48654350+buddhhu@users.noreply.github.com>
Co-authored-by: TechiError <error@notavailable.live>
Co-authored-by: Avish Kumar <85635883+aviskumar@users.noreply.github.com>
Co-authored-by: Vɪɴᴀʏᴀᴋ Pᴀɴᴅᴇʏ <87496159+harpia-vieillot@users.noreply.github.com>
Co-authored-by: Shrimadhav U K <6317196+spechide@users.noreply.github.com>
Co-authored-by: Dark <darkbeamer.official@gmail.com>
Co-authored-by: Muhamad Risman Aziz <62795826+mrismanaziz@users.noreply.github.com>
Co-authored-by: Ashik Muhammed <84127769+MR-JINN-OF-TG@users.noreply.github.com>
Co-authored-by: MMETMA <79155572+MMETMA@users.noreply.github.com>
Co-authored-by: amirmehdinzri <94852182+amirmehdinzri@users.noreply.github.com>
2021-12-31 23:48:53 +05:30

686 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Ultroid - UserBot
# Copyright (C) 2021-2022 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://www.github.com/TeamUltroid/Ultroid/blob/main/LICENSE/>.
import base64
from datetime import datetime
from random import choice
from re import compile as re_compile
from bs4 import BeautifulSoup as bs
from pyUltroid.functions.misc import google_search
from pyUltroid.functions.tools import (
_webupload_cache,
async_searcher,
get_ofox,
saavn_search,
webuploader,
)
from telethon import Button
from telethon.tl.types import DocumentAttributeAudio as Audio
from telethon.tl.types import InputWebDocument as wb
from . import *
from . import _ult_cache
SUP_BUTTONS = [
[
Button.url("• Repo •", url="https://github.com/TeamUltroid/Ultroid"),
Button.url("• Support •", url="t.me/UltroidSupport"),
],
]
ofox = "https://telegra.ph/file/231f0049fcd722824f13b.jpg"
gugirl = "https://telegra.ph/file/0df54ae4541abca96aa11.jpg"
ultpic = "https://telegra.ph/file/4136aa1650bc9d4109cc5.jpg"
apis = [
"QUl6YVN5QXlEQnNZM1dSdEI1WVBDNmFCX3c4SkF5NlpkWE5jNkZV",
"QUl6YVN5QkYwenhMbFlsUE1wOXh3TVFxVktDUVJxOERnZHJMWHNn",
"QUl6YVN5RGRPS253blB3VklRX2xiSDVzWUU0Rm9YakFLSVFWMERR",
]
@in_pattern("ofox", owner=True)
async def _(e):
try:
match = e.text.split(" ", maxsplit=1)[1]
except IndexError:
kkkk = e.builder.article(
title="Enter Device Codename",
thumb=wb(ofox, 0, "image/jpeg", []),
text="**OFx🦊Rᴇᴇʀʏ**\n\nYou didn't search anything",
buttons=Button.switch_inline("Sᴇᴀʀʜ Aɢᴀɪɴ", query="ofox ", same_peer=True),
)
return await e.answer([kkkk])
device, releases = await get_ofox(match)
if device.get("detail") is None:
fox = []
fullname = device["full_name"]
codename = device["codename"]
str(device["supported"])
maintainer = device["maintainer"]["name"]
link = f"https://orangefox.download/device/{codename}"
for data in releases["data"]:
release = data["type"]
version = data["version"]
size = humanbytes(data["size"])
release_date = datetime.utcfromtimestamp(data["date"]).strftime("%Y-%m-%d")
text = f"[\xad]({ofox})**OʀᴀɴɢᴇFx Rᴇᴇʀʏ Fʀ**\n\n"
text += f"` Fʟʟ Nᴀᴍᴇ: {fullname}`\n"
text += f"` Cᴅᴇɴᴀᴍᴇ: {codename}`\n"
text += f"` Mᴀɪɴᴛᴀɪɴᴇʀ: {maintainer}`\n"
text += f"` Bɪʟᴅ Tʏᴘᴇ: {release}`\n"
text += f"` Vᴇʀsɪɴ: {version}`\n"
text += f"` Sɪᴇ: {size}`\n"
text += f"` Bɪʟᴅ Dᴀᴛᴇ: {release_date}`"
fox.append(
await e.builder.article(
title=f"{fullname}",
description=f"{version}\n{release_date}",
text=text,
thumb=wb(ofox, 0, "image/jpeg", []),
link_preview=True,
buttons=[
Button.url("Dɴʟᴀᴅ", url=f"{link}"),
Button.switch_inline(
"Sᴇᴀʀʜ Aɢᴀɪɴ", query="ofox ", same_peer=True
),
],
)
)
await e.answer(
fox, switch_pm="OrangeFox Recovery Search.", switch_pm_param="start"
)
else:
await e.answer(
[], switch_pm="OrangeFox Recovery Search.", switch_pm_param="start"
)
@in_pattern("fl2lnk ?(.*)", owner=True)
async def _(e):
match = e.pattern_match.group(1)
chat_id, msg_id = match.split(":")
filename = _webupload_cache[int(chat_id)][int(msg_id)]
if "/" in filename:
filename = filename.split("/")[-1]
__cache = f"{chat_id}:{msg_id}"
buttons = [
[
Button.inline("anonfiles", data=f"flanonfiles//{__cache}"),
Button.inline("transfer", data=f"fltransfer//{__cache}"),
],
[
Button.inline("bayfiles", data=f"flbayfiles//{__cache}"),
Button.inline("x0.at", data=f"flx0.at//{__cache}"),
],
[
Button.inline("file.io", data=f"flfile.io//{__cache}"),
Button.inline("siasky", data=f"flsiasky//{__cache}"),
],
]
try:
lnk = e.builder.article(
title=f"Upload {filename}",
text=f"**File:**\n{filename}",
buttons=buttons,
)
except BaseException:
lnk = e.builder.article(
title="fl2lnk",
text="File not found",
)
await e.answer([lnk], switch_pm="File to Link.", switch_pm_param="start")
@callback(
re_compile(
"fl(.*)",
),
owner=True,
)
async def _(e):
t = (e.data).decode("UTF-8")
data = t[2:]
host = data.split("//")[0]
chat_id, msg_id = data.split("//")[1].split(":")
filename = _webupload_cache[int(chat_id)][int(msg_id)]
if "/" in filename:
filename = filename.split("/")[-1]
await e.edit(f"Uploading `{filename}` on {host}")
link = (await webuploader(chat_id, msg_id, host)).strip().replace("\n", "")
await e.edit(f"Uploaded [{filename}]({link}) on {host}.")
@in_pattern("repo", owner=True)
async def repo(e):
res = [
await e.builder.article(
title="Ultroid Userbot",
description="Userbot | Telethon",
thumb=wb(ultpic, 0, "image/jpeg", []),
text="• **ULTROID USERBOT** •",
buttons=SUP_BUTTONS,
),
]
await e.answer(res, switch_pm="Ultroid Repo.", switch_pm_param="start")
@in_pattern("go", owner=True)
async def gsearch(q_event):
try:
match = q_event.text.split(" ", maxsplit=1)[1]
except IndexError:
return await q_event.answer(
[], switch_pm="Google Search. Enter a query!", switch_pm_param="start"
)
searcher = []
gresults = await google_search(match)
for i in gresults:
try:
title = i["title"]
link = i["link"]
desc = i["description"]
searcher.append(
await q_event.builder.article(
title=title,
description=desc,
thumb=wb(gugirl, 0, "image/jpeg", []),
text=f"**Gɢʟᴇ Sᴇᴀʀʜ**\n\n**••Tɪᴛʟᴇ••**\n`{title}`\n\n**••Dᴇsʀɪᴘᴛɪɴ••**\n`{desc}`",
link_preview=False,
buttons=[
[Button.url("Lɪɴᴋ", url=f"{link}")],
[
Button.switch_inline(
"Sᴇᴀʀʜ Aɢᴀɪɴ",
query="go ",
same_peer=True,
),
Button.switch_inline(
"Sʜᴀʀᴇ",
query=f"go {match}",
same_peer=False,
),
],
],
),
)
except IndexError:
break
await q_event.answer(searcher, switch_pm="Google Search.", switch_pm_param="start")
@in_pattern("mods", owner=True)
async def _(e):
try:
quer = e.text.split(" ", maxsplit=1)[1]
except IndexError:
return await e.answer(
[], switch_pm="Mod Apps Search. Enter app name!", switch_pm_param="start"
)
page = 1
start = (page - 1) * 3 + 1
da = base64.b64decode(choice(apis)).decode("ascii")
url = f"https://www.googleapis.com/customsearch/v1?key={da}&cx=25b3b50edb928435b&q={quer}&start={start}"
data = await async_searcher(url, re_json=True)
search_items = data.get("items")
modss = []
for a in search_items:
title = a.get("title")
desc = a.get("snippet")
link = a.get("link")
text = f"**••Tɪᴛʟᴇ••** `{title}`\n\n"
text += f"**Dᴇsʀɪᴘᴛɪɴ** `{desc}`"
modss.append(
await e.builder.article(
title=title,
description=desc,
text=text,
link_preview=True,
buttons=[
[Button.url("Dɴʟᴀᴅ", url=f"{link}")],
[
Button.switch_inline(
"Mʀᴇ Mᴅs",
query="mods ",
same_peer=True,
),
Button.switch_inline(
"Sʜᴀʀᴇ",
query=f"mods {quer}",
same_peer=False,
),
],
],
),
)
await e.answer(modss, switch_pm="Search Mod Applications.", switch_pm_param="start")
# Inspired by @FindXDaBot
@in_pattern("xda", owner=True)
async def xda_dev(event):
QUERY = event.text.split(" ", maxsplit=1)
try:
query = QUERY[1]
except IndexError:
return await event.answer(
[], switch_pm=get_string("instu_3"), switch_pm_param="start"
)
le = "https://www.xda-developers.com/search/" + query.replace(" ", "+")
ct = await async_searcher(le, re_content=True)
ml = bs(ct, "html.parser", from_encoding="utf-8")
ml = ml.find_all("div", re_compile("layout_post_"), id=re_compile("post-"))
out = []
for on in ml:
data = on.find_all("img", "xda_image")[0]
title = data["alt"]
thumb = data["src"]
hre = on.find_all("div", "item_content")[0].find("h4").find("a")["href"]
desc = on.find_all("div", "item_meta clearfix")[0].text
thumb = wb(thumb, 0, "image/jpeg", [])
text = f"[{title}]({hre})"
out.append(
await event.builder.article(
title=title, description=desc, url=hre, thumb=thumb, text=text
)
)
uppar = "No Results Found :(" if not out else "|| XDA Search Results ||"
await event.answer(out, switch_pm=uppar, switch_pm_param="start")
APP_CACHE = {}
RECENTS = {}
@in_pattern("app", owner=True)
async def _(e):
try:
f = e.text.split(" ", maxsplit=1)[1].lower()
except IndexError:
get_string("instu_1")
res = []
if APP_CACHE and RECENTS.get(e.sender_id):
for a in RECENTS[e.sender_id]:
if APP_CACHE.get(a):
res.append(APP_CACHE[a][0])
return await e.answer(
res, switch_pm=get_string("instu_2"), switch_pm_param="start"
)
try:
return await e.answer(
APP_CACHE[f], switch_pm="Application Searcher.", switch_pm_param="start"
)
except KeyError:
pass
foles = []
base_uri = "https://play.google.com"
url = f"{base_uri}/store/search?q={f.replace(' ', '%20')}&c=apps"
aap = await async_searcher(url, re_content=True)
b_ = bs(aap, "html.parser", from_encoding="utf-8")
aap = b_.find_all("div", "Vpfmgd")
for z in aap[:10]:
url = base_uri + z.find("a")["href"]
scra = await async_searcher(url, re_content=True)
bp = bs(scra, "html.parser", from_encoding="utf-8")
name = z.find("div", "WsMG1c nnK0zc")["title"]
desc = (
str(bp.find("div", jsname="sngebd"))
.replace('<div jsname="sngebd">', "")
.replace("<br/>", "\n")
.replace("</div>", "")[:300]
+ "..."
)
dev = z.find("div", "KoLSrc").text
icon = z.find("img", "T75of QNCnCf")["data-src"]
text = f"**••Aᴘᴘ Nᴀᴍᴇ••** [{name}]({icon})\n"
text += f"**••Dᴇᴇʟᴘᴇʀ••** `{dev}`\n"
text += f"**••Dᴇsʀɪᴘᴛɪɴ••**\n`{desc}`"
foles.append(
await e.builder.article(
title=name,
description=dev,
thumb=wb(icon, 0, "image/jpeg", []),
text=text,
link_preview=True,
buttons=[
[Button.url("Lɪɴᴋ", url=url)],
[
Button.switch_inline(
"Mʀᴇ Aᴘᴘs",
query="app ",
same_peer=True,
),
Button.switch_inline(
"Sʜᴀʀᴇ",
query=f"app {f}",
same_peer=False,
),
],
],
),
)
APP_CACHE.update({f: foles})
if RECENTS.get(e.sender_id):
RECENTS[e.sender_id].append(f)
else:
RECENTS.update({e.sender_id: [f]})
await e.answer(foles, switch_pm="Application Searcher.", switch_pm_param="start")
PISTON_URI = "https://emkc.org/api/v2/piston/"
PISTON_LANGS = {}
@in_pattern("run", owner=True)
async def piston_run(event):
try:
lang = event.text.split()[1]
code = event.text.split(maxsplit=2)[2]
except IndexError:
result = await event.builder.article(
title="Bad Query",
description="Usage: [Language] [code]",
thumb=wb(
"https://telegra.ph/file/e33c57fc5f1044547e4d8.jpg", 0, "image/jpeg", []
),
text=f'**Inline Usage**\n\n`@{asst.me.username} run python print("hello world")`\n\n[Language List](https://telegra.ph/Ultroid-09-01-6)',
)
return await event.answer([result])
if not PISTON_LANGS:
se = await async_searcher(PISTON_URI + "runtimes", re_json=True)
PISTON_LANGS.update({lang.pop("language"): lang for lang in se})
if lang in PISTON_LANGS.keys():
version = PISTON_LANGS[lang]["version"]
else:
result = await event.builder.article(
title="Unsupported Language",
description="Usage: [Language] [code]",
thumb=wb(
"https://telegra.ph/file/e33c57fc5f1044547e4d8.jpg", 0, "image/jpeg", []
),
text=f'**Inline Usage**\n\n`@{asst.me.username} run python print("hello world")`\n\n[Language List](https://telegra.ph/Ultroid-09-01-6)',
)
return await event.answer([result])
output = await async_searcher(
PISTON_URI + "execute",
post=True,
json={"language": lang, "version": version, "files": [{"content": code}]},
re_json=True,
)
output = output["run"]["output"] or get_string("instu_4")
if len(output) > 3000:
output = output[:3000] + "..."
result = await event.builder.article(
title="Result",
description=output,
text=f"• **Language:**\n`{lang}`\n\n• **Code:**\n`{code}`\n\n• **Result:**\n`{output}`",
thumb=wb(
"https://telegra.ph/file/871ee4a481f58117dccc4.jpg", 0, "image/jpeg", []
),
buttons=Button.switch_inline("Fork", query=event.text, same_peer=True),
)
await event.answer([result], switch_pm="• Piston •", switch_pm_param="start")
FDROID_ = {}
@in_pattern("fdroid", owner=True)
async def do_magic(event):
try:
match = event.text.split(" ", maxsplit=1)[1].lower()
except IndexError:
return await event.answer(
[], switch_pm="Enter Query to Search", switch_pm_param="start"
)
if FDROID_.get(match):
return await event.answer(
FDROID_[match], switch_pm=f"• Results for {match}", switch_pm_param="start"
)
link = "https://search.f-droid.org/?q=" + match.replace(" ", "+")
content = await async_searcher(link, re_content=True)
BSC = bs(content, "html.parser", from_encoding="utf-8")
ress = []
for dat in BSC.find_all("a", "package-header")[:10]:
image = dat.find("img", "package-icon")["src"]
if image.endswith("/"):
image = "https://telegra.ph/file/a8dd4a92c5a53a89d0eff.jpg"
title = dat.find("h4", "package-name").text.strip()
desc = dat.find("span", "package-summary").text.strip()
text = f"• **Name :** `{title}`\n\n"
text += f"• **Description :** `{desc}`\n"
text += f"• **License :** `{dat.find('span', 'package-license').text.strip()}`"
imga = wb(image, 0, "image/jpeg", [])
ress.append(
await event.builder.article(
title=title,
type="photo",
description=desc,
text=text,
content=imga,
thumb=imga,
include_media=True,
buttons=[
Button.inline(
"• Download •", "fd" + dat["href"].split("packages/")[-1]
),
Button.switch_inline("• Share •", query=event.text),
],
)
)
msg = f"Showing {len(ress)} Results!" if ress else "No Results Found"
FDROID_.update({match: ress})
await event.answer(ress, switch_pm=msg, switch_pm_param="start")
_koo_ = {}
@in_pattern("koo", owner=True)
async def koo_search(ult):
"""Search Users on koo with API"""
try:
match = ult.text.split(maxsplit=1)[1].lower()
match_ = match
except IndexError:
return await ult.answer(
[], switch_pm="Enter Query to Search..", switch_pm_param="start"
)
if _koo_.get(match):
return await ult.answer(
_koo_[match], switch_pm="• Koo Search •", switch_pm_param="start"
)
res = []
se_ = None
key_count = None
if " | " in match:
match = match.split(" | ", maxsplit=1)
try:
key_count = int(match[1])
except ValueError:
pass
match = match[0]
match = match.replace(" ", "+")
req = await async_searcher(
f"https://www.kooapp.com/apiV1/search?query={match}&searchType=EXPLORE",
re_json=True,
)
if key_count:
try:
se_ = [req["feed"][key_count - 1]]
except KeyError:
pass
if not se_:
se_ = req["feed"]
for count, feed in enumerate(se_[:10]):
if feed["uiItemType"] == "search_profile":
count += 1
item = feed["items"][0]
profileImage = (
item["profileImageBaseUrl"]
if item.get("profileImageBaseUrl")
else "https://telegra.ph/file/dc28e69bd7ea2c0f25329.jpg"
)
extra = await async_searcher(
"https://www.kooapp.com/apiV1/users/handle/" + item["userHandle"],
re_json=True,
)
img = wb(profileImage, 0, "image/jpeg", [])
text = f"‣ **Name :** `{item['name']}`"
if extra.get("title"):
text += f"\n‣ **Title :** `{extra['title']}`"
text += f"\n‣ **Username :** `@{item['userHandle']}`"
if extra.get("description"):
text += f"\n‣ **Description :** `{extra['description']}`"
text += f"\n‣ **Followers :** `{extra['followerCount']}` ‣ **Following :** {extra['followingCount']}"
if extra.get("socialProfile") and extra["socialProfile"].get("website"):
text += f"\n‣ **Website :** {extra['socialProfile']['website']}"
res.append(
await ult.builder.article(
title=item["name"],
description=item.get("title") or f"@{item['userHandle']}",
type="photo",
content=img,
thumb=img,
include_media=True,
text=text,
buttons=[
Button.url(
"View", "https://kooapp.com/profile/" + item["userHandle"]
),
Button.switch_inline(
"• Share •",
query=ult.text if key_count else ult.text + f" | {count}",
),
],
)
)
if not res:
switch = "No Results Found :("
else:
_koo_.update({match_: res})
switch = f"Showing {len(res)} Results!"
await ult.answer(res, switch_pm=switch, switch_pm_param="start")
# Thanks to OpenSource
_bearer_collected = [
"AAAAAAAAAAAAAAAAAAAAALIKKgEAAAAA1DRuS%2BI7ZRKiagD6KHYmreaXomo%3DP5Vaje4UTtEkODg0fX7nCh5laSrchhtLxeyEqxXpv0w9ZKspLD",
"AAAAAAAAAAAAAAAAAAAAAL5iUAEAAAAAmo6FYRjqdKlI3cNziIm%2BHUQB9Xs%3DS31pj0mxARMTOk2g9dvQ1yP9wknvY4FPBPUlE00smJcncw4dPR",
"AAAAAAAAAAAAAAAAAAAAAN6sVgEAAAAAMMjMMWrwgGyv7YQOWN%2FSAsO5SGM%3Dg8MG9Jq93Rlllaok6eht7HvRCruN4Vpzp4NaVsZaaHHWSTzKI8",
]
@in_pattern("twitter", owner=True)
async def twitter_search(event):
try:
match = event.text.split(maxsplit=1)[1].lower()
except IndexError:
return await event.answer(
[], switch_pm="Enter Query to Search", switch_pm_param="start"
)
try:
return await event.answer(
_ult_cache["twitter"][match],
switch_pm="• Twitter Search •",
switch_pm_param="start",
)
except KeyError:
pass
headers = {"Authorization": "bearer " + choice(_bearer_collected)}
res = await async_searcher(
f"https://api.twitter.com/1.1/users/search.json?q={match}",
headers=headers,
re_json=True,
)
reso = []
for user in res:
thumb = wb(user["profile_image_url_https"], 0, "image/jpeg", [])
if user.get("profile_banner_url"):
url = user["profile_banner_url"]
text = f"[\xad]({url})• **Name :** `{user['name']}`\n"
else:
text = f"• **Name :** `{user['name']}`\n"
text += f"• **Description :** `{user['description']}`\n"
text += f"• **Username :** `@{user['screen_name']}`\n"
text += f"• **Followers :** `{user['followers_count']}` • **Following :** `{user['friends_count']}`\n"
pro_ = "https://twitter.com/" + user["screen_name"]
text += f"• **Link :** [Click Here]({pro_})\n_"
reso.append(
await event.builder.article(
title=user["name"],
description=user["description"],
url=pro_,
text=text,
thumb=thumb,
)
)
swi_ = "No User Found :(" if not reso else f"🐦 Showing {len(reso)} Results!"
await event.answer(reso, switch_pm=swi_, switch_pm_param="start")
if _ult_cache.get("twitter"):
_ult_cache["twitter"].update({match: reso})
else:
_ult_cache.update({"twitter": {match: reso}})
_savn_cache = {}
@in_pattern("saavn", owner=True)
async def savn_s(event):
try:
query = event.text.split(maxsplit=1)[1].lower()
except IndexError:
return await event.answer(
[], switch_pm="Enter Query to search 🔍", switch_pm_param="start"
)
if query in _savn_cache:
return await event.answer(
_savn_cache[query],
switch_pm=f"Showing Results for {query}",
switch_pm_param="start",
)
results = await saavn_search(query)
swi = "No Results Found!" if not results else "🎵 Saavn Search"
res = []
for song in results:
thumb = wb(song["image"], 0, "image/jpeg", [])
text = f"• **Title :** {song['song']}"
text += f"\n• **Year :** {song['year']}"
text += f"\n• **Lang :** {song['language']}"
text += f"\n• **Artist :** {song['primary_artists']}"
text += f"\n• **Release Date :** {song['release_date']}"
res.append(
await event.builder.article(
title=song["song"],
type="audio",
text=text,
include_media=True,
buttons=Button.switch_inline(
"Search Again 🔍", query="saavn", same_peer=True
),
thumb=thumb,
content=wb(
song["media_url"],
0,
"audio/mp4",
[
Audio(
title=song["song"],
duration=int(song["duration"]),
performer=song["primary_artists"],
)
],
),
)
)
await event.answer(res, switch_pm=swi, switch_pm_param="start")
_savn_cache.update({query: res})