diff --git a/app/plugins/misc/song.py b/app/plugins/misc/song.py index b74502c..466bdd7 100644 --- a/app/plugins/misc/song.py +++ b/app/plugins/misc/song.py @@ -1,14 +1,14 @@ import asyncio import glob +import json import os import shutil from time import time from urllib.parse import urlparse -import yt_dlp -from ub_core.utils import aio +from ub_core.utils import aio, run_shell_cmd -from app import Message, bot +from app import BOT, Message, bot domains = [ "www.youtube.com", @@ -20,48 +20,44 @@ domains = [ ] -class FakeLogger(object): - def debug(self, msg): - pass - - def warning(self, msg): - pass - - def error(self, msg): - pass - - @bot.add_cmd(cmd="song") -async def song_dl(bot: bot, message: Message) -> None | Message: +async def song_dl(bot: BOT, message: Message) -> None | Message: reply_query = None + for link in message.reply_text_list: if urlparse(link).netloc in domains: reply_query = link break + query = reply_query or message.filtered_input + if not query: await message.reply("Give a song name or link to download.") return + response: Message = await message.reply("Searching....") + download_path: str = os.path.join("downloads", str(time())) + query_or_search: str = query if query.startswith("http") else f"ytsearch:{query}" - audio_format = "mp3" if "-m" in message.flags else "opus" - song_info: dict = await get_download_info( - query=query_or_search, path=download_path, audio_format=audio_format - ) - if song_info is None: - await message.reply("Download Timed Out.") - return - if not query_or_search.startswith("http"): - song_info: str = song_info["entries"][0] - duration: int = song_info["duration"] - artist: str = song_info["channel"] - thumb = await aio.in_memory_dl(song_info["thumbnail"]) + + song_info: dict = await get_download_info(query=query_or_search, path=download_path) + + duration, artist, thumb = 0, "", None + + if isinstance(song_info, dict): + duration: int = song_info["duration"] + artist: str = song_info["channel"] + thumb = await aio.in_memory_dl(song_info["thumbnail"]) + down_path: list = glob.glob(os.path.join(download_path, "*")) + if not down_path: await response.edit("Song Not found.") return + await response.edit("Uploading....") + for audio_file in down_path: if audio_file.endswith((".opus", ".mp3")): await message.reply_audio( @@ -70,25 +66,39 @@ async def song_dl(bot: bot, message: Message) -> None | Message: performer=str(artist), thumb=thumb, ) + await response.delete() + shutil.rmtree(download_path, ignore_errors=True) -async def get_download_info(query: str, path: str, audio_format: str) -> dict | None: - yt_opts = { - "logger": FakeLogger(), - "outtmpl": os.path.join(path, "%(title)s.%(ext)s"), - "format": "bestaudio", - "postprocessors": [ - {"key": "FFmpegExtractAudio", "preferredcodec": audio_format}, - {"key": "FFmpegMetadata"}, - {"key": "EmbedThumbnail"}, - ], - } - ytdl: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(yt_opts) +async def get_download_info(query: str, path: str): + download_cmd = ( + f"yt-dlp -o '{os.path.join(path, '%(title)s.%(ext)s')}' " + f"-f 'bestaudio' " + f"--no-warnings " + f"--ignore-errors " + f"--ignore-no-formats-error " + f"--quiet " + f"--no-playlist " + f"--audio-quality 0 " + f"--extract-audio " + f"--embed-thumbnail " + f"--embed-metadata " + f"--print-json " + f"'{query}'" + ) try: + async with asyncio.timeout(30): - yt_info: dict = await asyncio.to_thread(ytdl.extract_info, query) - return yt_info + + song_info = (await run_shell_cmd(download_cmd)).strip() + + serialised_json = json.loads(song_info) + return serialised_json + except asyncio.TimeoutError: shutil.rmtree(path=path, ignore_errors=True) + + except json.JSONDecodeError: + return