Files
Moon-UB-overver/modules/direct.py
sweep-ai[bot] cc64213952 feat: Updated modules/direct.py (#32)
Co-authored-by: sweep-ai[bot] <128439645+sweep-ai[bot]@users.noreply.github.com>
2023-12-16 10:52:56 +05:30

314 lines
11 KiB
Python

# Copyright (C) 2019 The Raphielscape Company LLC.
#
# Licensed under the Raphielscape Public License, Version 1.d (the "License");
# you may not use this file except in compliance with the License.
#
""" Userbot module containing various sites direct links generators"""
import json
import re
import urllib.parse
from random import choice
from subprocess import PIPE, Popen
import requests
from bs4 import BeautifulSoup
from humanize import naturalsize
from pyrogram import Client, enums, filters
from pyrogram.types import Message
from utils.misc import modules_help, prefix
def subprocess_run(cmd):
reply = ""
subproc = Popen(
cmd,
stdout=PIPE,
stderr=PIPE,
shell=True,
universal_newlines=True,
executable="bash",
)
talk = subproc.communicate()
exitCode = subproc.returncode
if exitCode != 0:
reply += (
"```An error was detected while running the subprocess:\n"
f"exit code: {exitCode}\n"
f"stdout: {talk[0]}\n"
f"stderr: {talk[1]}```"
)
return reply
return talk
@Client.on_message(filters.command("direct", prefix) & filters.me)
async def direct_link_generator(_, m: Message):
if len(m.command) > 1:
message = m.text.split(maxsplit=1)[1]
elif m.reply_to_message:
message = m.reply_to_message.text
else:
await m.edit(f"<b>Usage: </b><code>{prefix}direct [url]</code>", parse_mode=enums.ParseMode.HTML)
return
reply = ""
links = re.findall(r"\bhttps?://.*\.\S+", message)
if not links:
reply = "`No links found!`"
await m.edit(reply, parse_mode=enums.ParseMode.MARKDOWN)
for link in links:
if "drive.google.com" in link:
reply += gdrive(link)
elif "yadi.sk" in link:
reply += yandex_disk(link)
elif "cloud.mail.ru" in link:
reply += cm_ru(link)
elif "mediafire.com" in link:
reply += mediafire(link)
elif "sourceforge.net" in link:
reply += sourceforge(link)
elif "osdn.net" in link:
reply += osdn(link)
elif "androidfilehost.com" in link:
reply += androidfilehost(link)
else:
reply += re.findall(r"\bhttps?://(.*?[^/]+)", link)[0] + " is not supported"
await m.edit(reply, parse_mode=enums.ParseMode.MARKDOWN)
def gdrive(url: str) -> str:
""" GDrive direct links generator """
drive = "https://drive.google.com"
try:
link = re.findall(r"\bhttps?://drive\.google\.com\S+", url)[0]
except IndexError:
reply = "`No Google drive links found`\n"
return reply
file_id = ""
reply = ""
if link.find("view") != -1:
file_id = link.split("/")[-2]
elif link.find("open?id=") != -1:
file_id = link.split("open?id=")[1].strip()
elif link.find("uc?id=") != -1:
file_id = link.split("uc?id=")[1].strip()
url = f"{drive}/uc?export=download&id={file_id}"
download = requests.get(url, stream=True, allow_redirects=False)
cookies = download.cookies
try:
# In case of small file size, Google downloads directly
dl_url = download.headers["location"]
page = BeautifulSoup(download.content, "html.parser")
if "accounts.google.com" in dl_url: # non-public file
reply += "`Link is not public!`\n"
return reply
name = "Direct Download Link"
except KeyError:
# In case of download warning page
page = BeautifulSoup(download.content, "html.parser")
if download.headers is not None:
dl_url = download.headers.get("location")
page_element = page.find("a", {"id": "uc-download-link"})
if page_element is not None:
export = drive + page_element.get("href")
name = page.find("span", {"class": "uc-name-size"}).text
response = requests.get(
export, stream=True, allow_redirects=False, cookies=cookies
)
dl_url = response.headers["location"]
if "accounts.google.com" in dl_url:
name = page.find("span", {"class": "uc-name-size"}).text
reply += "Link is not public!"
return reply
if "=sharing" in dl_url:
name = page.find("span", {"class": "uc-name-size"}).text
reply += "```Provide GDrive Link not directc sharing of GDrive!```"
return reply
reply += f"[{name}]({dl_url})\n"
return reply
def yandex_disk(url: str) -> str:
"""Yandex.Disk direct links generator
Based on https://github.com/wldhx/yadisk-direct"""
reply = ""
try:
link = re.findall(r"\bhttps?://.*yadi\.sk\S+", url)[0]
except IndexError:
reply = "`No Yandex.Disk links found`\n"
return reply
api = "https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={}"
try:
dl_url = requests.get(api.format(link)).json()["href"]
name = dl_url.split("filename=")[1].split("&disposition")[0]
reply += f"[{name}]({dl_url})\n"
except KeyError:
reply += "`Error: File not found / Download limit reached`\n"
return reply
return reply
def cm_ru(url: str) -> str:
"""cloud.mail.ru direct links generator
Using https://github.com/JrMasterModelBuilder/cmrudl.py"""
reply = ""
try:
link = re.findall(r"\bhttps?://.*cloud\.mail\.ru\S+", url)[0]
except IndexError:
reply = "`No cloud.mail.ru links found`\n"
return reply
cmd = f"bin/cmrudl -s {link}"
result = subprocess_run(cmd)
try:
result = result[0].splitlines()[-1]
data = json.loads(result)
except json.decoder.JSONDecodeError:
reply += "`Error: Can't extract the link`\n"
return reply
except IndexError:
return reply
dl_url = data["download"]
name = data["file_name"]
size = naturalsize(int(data["file_size"]))
reply += f"[{name} ({size})]({dl_url})\n"
return reply
def mediafire(url: str) -> str:
""" MediaFire direct links generator """
try:
link = re.findall(r"\bhttps?://.*mediafire\.com\S+", url)[0]
except IndexError:
reply = "`No MediaFire links found`\n"
return reply
reply = ""
page = BeautifulSoup(requests.get(link).content, "lxml")
info = page.find("a", {"aria-label": "Download file"})
dl_url = info.get("href")
size = re.findall(r"\(.*\)", info.text)[0]
name = page.find("div", {"class": "filename"}).text
reply += f"[{name} {size}]({dl_url})\n"
return reply
def sourceforge(url: str) -> str:
""" SourceForge direct links generator """
try:
link = re.findall(r"\bhttps?://.*sourceforge\.net\S+", url)[0]
except IndexError:
reply = "`No SourceForge links found`\n"
return reply
file_path = re.findall(r"files(.*)/download", link)[0]
reply = f"Mirrors for __{file_path.split('/')[-1]}__\n"
project = re.findall(r"projects?/(.*?)/files", link)[0]
mirrors = (
f"https://sourceforge.net/settings/mirror_choices?"
f"projectname={project}&filename={file_path}"
)
page = BeautifulSoup(requests.get(mirrors).content, "html.parser")
info = page.find("ul", {"id": "mirrorList"}).findAll("li")
for mirror in info[1:]:
name = re.findall(r"\((.*)\)", mirror.text.strip())[0]
dl_url = (
f'https://{mirror["id"]}.dl.sourceforge.net/project/{project}/{file_path}'
)
reply += f"[{name}]({dl_url}) "
return reply
def osdn(url: str) -> str:
""" OSDN direct links generator """
osdn_link = "https://osdn.net"
try:
link = re.findall(r"\bhttps?://.*osdn\.net\S+", url)[0]
except IndexError:
reply = "`No OSDN links found`\n"
return reply
page = BeautifulSoup(requests.get(link, allow_redirects=True).content, "lxml")
info = page.find("a", {"class": "mirror_link"})
link = urllib.parse.unquote(osdn_link + info["href"])
reply = f"Mirrors for __{link.split('/')[-1]}__\n"
mirrors = page.find("form", {"id": "mirror-select-form"}).findAll("tr")
for data in mirrors[1:]:
mirror = data.find("input")["value"]
name = re.findall(r"\((.*)\)", data.findAll("td")[-1].text.strip())[0]
dl_url = re.sub(r"m=(.*)&f", f"m={mirror}&f", link)
reply += f"[{name}]({dl_url}) "
return reply
def androidfilehost(url: str) -> str:
""" AFH direct links generator """
try:
link = re.findall(r"\bhttps?://.*androidfilehost.*fid.*\S+", url)[0]
except IndexError:
reply = "`No AFH links found`\n"
return reply
fid = re.findall(r"\?fid=(.*)", link)[0]
session = requests.Session()
user_agent = useragent()
headers = {"user-agent": user_agent}
res = session.get(link, headers=headers, allow_redirects=True)
headers = {
"origin": "https://androidfilehost.com",
"accept-encoding": "gzip, deflate, br",
"accept-language": "en-US,en;q=0.9",
"user-agent": user_agent,
"content-type": "application/x-www-form-urlencoded; charset=UTF-8",
"x-mod-sbb-ctype": "xhr",
"accept": "*/*",
"referer": f"https://androidfilehost.com/?fid={fid}",
"authority": "androidfilehost.com",
"x-requested-with": "XMLHttpRequest",
}
data = {"submit": "submit", "action": "getdownloadmirrors", "fid": f"{fid}"}
mirrors = None
reply = ""
error = "`Error: Can't find Mirrors for the link`\n"
try:
req = session.post(
"https://androidfilehost.com/libs/otf/mirrors.otf.php",
headers=headers,
data=data,
cookies=res.cookies,
)
mirrors = req.json()["MIRRORS"]
except (json.decoder.JSONDecodeError, TypeError):
reply += error
if not mirrors:
reply += error
return reply
for item in mirrors:
name = item["name"]
dl_url = item["url"]
reply += f"[{name}]({dl_url}) "
return reply
def useragent():
"""
useragent random setter
"""
useragents = BeautifulSoup(
requests.get(
"https://developers.whatismybrowser.com/"
"useragents/explore/operating_system_name/android/"
).content,
"lxml",
).findAll("td", {"class": "useragent"})
if not useragents:
return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
user_agent = choice(useragents)
return user_agent.text
modules_help["direct"] = {
"direct": "Url/reply to Url\
\n\n<b>Syntax : </b><code>.direct [url/reply] </code>\
\n<b>Usage :</b> Generates direct download link from supported URL(s)\
\n\n<b>Supported websites : </b><code>Google Drive - MEGA.nz - Cloud Mail - Yandex.Disk - AFH - MediaFire - SourceForge - OSDN</code>"
}