# Copyright (C) 2019 The Raphielscape Company LLC. # # Licensed under the Raphielscape Public License, Version 1.d (the "License"); # you may not use this file except in compliance with the License. # """ Userbot module containing various sites direct links generators""" import json import re import urllib.parse from random import choice from subprocess import PIPE, Popen import requests from bs4 import BeautifulSoup from humanize import naturalsize from pyrogram import Client, enums, filters from pyrogram.types import Message from utils.misc import modules_help, prefix def subprocess_run(cmd): reply = "" cmd_args = cmd.split() subproc = Popen( cmd_args, stdout=PIPE, stderr=PIPE, universal_newlines=True, executable="bash", ) talk = subproc.communicate() exitCode = subproc.returncode if exitCode != 0: reply += ( "```An error was detected while running the subprocess:\n" f"exit code: {exitCode}\n" f"stdout: {talk[0]}\n" f"stderr: {talk[1]}```" ) return reply return talk @Client.on_message(filters.command("direct", prefix) & filters.me) async def direct_link_generator(_, m: Message): if len(m.command) > 1: message = m.text.split(maxsplit=1)[1] elif m.reply_to_message: message = m.reply_to_message.text else: await m.edit(f"Usage: {prefix}direct [url]") return reply = "" links = re.findall(r"\bhttps?://.*\.\S+", message) if not links: reply = "`No links found!`" await m.edit(reply, parse_mode=enums.ParseMode.MARKDOWN) for link in links: if "drive.google.com" in link: reply += gdrive(link) elif "yadi.sk" in link: reply += yandex_disk(link) elif "cloud.mail.ru" in link: reply += cm_ru(link) elif "mediafire.com" in link: reply += mediafire(link) elif "sourceforge.net" in link: reply += sourceforge(link) elif "osdn.net" in link: reply += osdn(link) elif "androidfilehost.com" in link: reply += androidfilehost(link) else: reply += re.findall(r"\bhttps?://(.*?[^/]+)", link)[0] + " is not supported" await m.edit(reply, parse_mode=enums.ParseMode.MARKDOWN) def gdrive(url: str) -> str: """GDrive direct links generator""" drive = "https://drive.google.com" try: link = re.findall(r"\bhttps?://drive\.google\.com\S+", url)[0] except IndexError: reply = "`No Google drive links found`\n" return reply file_id = "" reply = "" if link.find("view") != -1: file_id = link.split("/")[-2] elif link.find("open?id=") != -1: file_id = link.split("open?id=")[1].strip() elif link.find("uc?id=") != -1: file_id = link.split("uc?id=")[1].strip() url = f"{drive}/uc?export=download&id={file_id}" download = requests.get(url, stream=True, allow_redirects=False) cookies = download.cookies try: # In case of small file size, Google downloads directly dl_url = download.headers["location"] page = BeautifulSoup(download.content, "html.parser") if "accounts.google.com" in dl_url: # non-public file reply += "`Link is not public!`\n" return reply name = "Direct Download Link" except KeyError: # In case of download warning page page = BeautifulSoup(download.content, "html.parser") if download.headers is not None: dl_url = download.headers.get("location") page_element = page.find("a", {"id": "uc-download-link"}) if page_element is not None: export = drive + page_element.get("href") name = page.find("span", {"class": "uc-name-size"}).text response = requests.get( export, stream=True, allow_redirects=False, cookies=cookies ) dl_url = response.headers["location"] if "accounts.google.com" in dl_url: name = page.find("span", {"class": "uc-name-size"}).text reply += "Link is not public!" return reply if "=sharing" in dl_url: name = page.find("span", {"class": "uc-name-size"}).text reply += "```Provide GDrive Link not directc sharing of GDrive!```" return reply reply += f"[{name}]({dl_url})\n" return reply def yandex_disk(url: str) -> str: """Yandex.Disk direct links generator Based on https://github.com/wldhx/yadisk-direct""" reply = "" try: link = re.findall(r"\bhttps?://.*yadi\.sk\S+", url)[0] except IndexError: reply = "`No Yandex.Disk links found`\n" return reply api = "https://cloud-api.yandex.net/v1/disk/public/resources/download?public_key={}" try: dl_url = requests.get(api.format(link)).json()["href"] name = dl_url.split("filename=")[1].split("&disposition")[0] reply += f"[{name}]({dl_url})\n" except KeyError: reply += "`Error: File not found / Download limit reached`\n" return reply return reply def cm_ru(url: str) -> str: """cloud.mail.ru direct links generator Using https://github.com/JrMasterModelBuilder/cmrudl.py""" reply = "" try: link = re.findall(r"\bhttps?://.*cloud\.mail\.ru\S+", url)[0] except IndexError: reply = "`No cloud.mail.ru links found`\n" return reply cmd = f"bin/cmrudl -s {link}" result = subprocess_run(cmd) try: result = result[0].splitlines()[-1] data = json.loads(result) except json.decoder.JSONDecodeError: reply += "`Error: Can't extract the link`\n" return reply except IndexError: return reply dl_url = data["download"] name = data["file_name"] size = naturalsize(int(data["file_size"])) reply += f"[{name} ({size})]({dl_url})\n" return reply def mediafire(url: str) -> str: """MediaFire direct links generator""" try: link = re.findall(r"\bhttps?://.*mediafire\.com\S+", url)[0] except IndexError: reply = "`No MediaFire links found`\n" return reply reply = "" page = BeautifulSoup(requests.get(link).content, "lxml") info = page.find("a", {"aria-label": "Download file"}) dl_url = info.get("href") size = re.findall(r"\(.*\)", info.text)[0] name = page.find("div", {"class": "filename"}).text reply += f"[{name} {size}]({dl_url})\n" return reply def sourceforge(url: str) -> str: """SourceForge direct links generator""" try: link = re.findall(r"\bhttps?://.*sourceforge\.net\S+", url)[0] except IndexError: reply = "`No SourceForge links found`\n" return reply file_path = re.findall(r"files(.*)/download", link)[0] reply = f"Mirrors for __{file_path.split('/')[-1]}__\n" project = re.findall(r"projects?/(.*?)/files", link)[0] mirrors = ( f"https://sourceforge.net/settings/mirror_choices?" f"projectname={project}&filename={file_path}" ) page = BeautifulSoup(requests.get(mirrors).content, "html.parser") info = page.find("ul", {"id": "mirrorList"}).findAll("li") for mirror in info[1:]: name = re.findall(r"\((.*)\)", mirror.text.strip())[0] dl_url = ( f'https://{mirror["id"]}.dl.sourceforge.net/project/{project}/{file_path}' ) reply += f"[{name}]({dl_url}) " return reply def osdn(url: str) -> str: """OSDN direct links generator""" osdn_link = "https://osdn.net" try: link = re.findall(r"\bhttps?://.*osdn\.net\S+", url)[0] except IndexError: reply = "`No OSDN links found`\n" return reply page = BeautifulSoup(requests.get(link, allow_redirects=True).content, "lxml") info = page.find("a", {"class": "mirror_link"}) link = urllib.parse.unquote(osdn_link + info["href"]) reply = f"Mirrors for __{link.split('/')[-1]}__\n" mirrors = page.find("form", {"id": "mirror-select-form"}).findAll("tr") for data in mirrors[1:]: mirror = data.find("input")["value"] name = re.findall(r"\((.*)\)", data.findAll("td")[-1].text.strip())[0] dl_url = re.sub(r"m=(.*)&f", f"m={mirror}&f", link) reply += f"[{name}]({dl_url}) " return reply def androidfilehost(url: str) -> str: """AFH direct links generator""" try: link = re.findall(r"\bhttps?://.*androidfilehost.*fid.*\S+", url)[0] except IndexError: reply = "`No AFH links found`\n" return reply fid = re.findall(r"\?fid=(.*)", link)[0] session = requests.Session() user_agent = useragent() headers = {"user-agent": user_agent} res = session.get(link, headers=headers, allow_redirects=True) headers = { "origin": "https://androidfilehost.com", "accept-encoding": "gzip, deflate, br", "accept-language": "en-US,en;q=0.9", "user-agent": user_agent, "content-type": "application/x-www-form-urlencoded; charset=UTF-8", "x-mod-sbb-ctype": "xhr", "accept": "*/*", "referer": f"https://androidfilehost.com/?fid={fid}", "authority": "androidfilehost.com", "x-requested-with": "XMLHttpRequest", } data = {"submit": "submit", "action": "getdownloadmirrors", "fid": f"{fid}"} mirrors = None reply = "" error = "`Error: Can't find Mirrors for the link`\n" try: req = session.post( "https://androidfilehost.com/libs/otf/mirrors.otf.php", headers=headers, data=data, cookies=res.cookies, ) mirrors = req.json()["MIRRORS"] except (json.decoder.JSONDecodeError, TypeError): reply += error if not mirrors: reply += error return reply for item in mirrors: name = item["name"] dl_url = item["url"] reply += f"[{name}]({dl_url}) " return reply def useragent(): """ useragent random setter """ useragents = BeautifulSoup( requests.get( "https://developers.whatismybrowser.com/" "useragents/explore/operating_system_name/android/" ).content, "lxml", ).findAll("td", {"class": "useragent"}) if not useragents: return "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" user_agent = choice(useragents) return user_agent.text modules_help["direct"] = { "direct": "Url/reply to Url\ \n\nSyntax : .direct [url/reply] \ \nUsage : Generates direct download link from supported URL(s)\ \n\nSupported websites : Google Drive - MEGA.nz - Cloud Mail - Yandex.Disk - AFH - MediaFire - SourceForge - OSDN" }