song.py: switch to shell yt-dlp
This commit is contained in:
@@ -1,14 +1,14 @@
|
||||
import asyncio
|
||||
import glob
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from time import time
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import yt_dlp
|
||||
from ub_core.utils import aio
|
||||
from ub_core.utils import aio, run_shell_cmd
|
||||
|
||||
from app import Message, bot
|
||||
from app import BOT, Message, bot
|
||||
|
||||
domains = [
|
||||
"www.youtube.com",
|
||||
@@ -20,48 +20,44 @@ domains = [
|
||||
]
|
||||
|
||||
|
||||
class FakeLogger(object):
|
||||
def debug(self, msg):
|
||||
pass
|
||||
|
||||
def warning(self, msg):
|
||||
pass
|
||||
|
||||
def error(self, msg):
|
||||
pass
|
||||
|
||||
|
||||
@bot.add_cmd(cmd="song")
|
||||
async def song_dl(bot: bot, message: Message) -> None | Message:
|
||||
async def song_dl(bot: BOT, message: Message) -> None | Message:
|
||||
reply_query = None
|
||||
|
||||
for link in message.reply_text_list:
|
||||
if urlparse(link).netloc in domains:
|
||||
reply_query = link
|
||||
break
|
||||
|
||||
query = reply_query or message.filtered_input
|
||||
|
||||
if not query:
|
||||
await message.reply("Give a song name or link to download.")
|
||||
return
|
||||
|
||||
response: Message = await message.reply("Searching....")
|
||||
|
||||
download_path: str = os.path.join("downloads", str(time()))
|
||||
|
||||
query_or_search: str = query if query.startswith("http") else f"ytsearch:{query}"
|
||||
audio_format = "mp3" if "-m" in message.flags else "opus"
|
||||
song_info: dict = await get_download_info(
|
||||
query=query_or_search, path=download_path, audio_format=audio_format
|
||||
)
|
||||
if song_info is None:
|
||||
await message.reply("Download Timed Out.")
|
||||
return
|
||||
if not query_or_search.startswith("http"):
|
||||
song_info: str = song_info["entries"][0]
|
||||
duration: int = song_info["duration"]
|
||||
artist: str = song_info["channel"]
|
||||
thumb = await aio.in_memory_dl(song_info["thumbnail"])
|
||||
|
||||
song_info: dict = await get_download_info(query=query_or_search, path=download_path)
|
||||
|
||||
duration, artist, thumb = 0, "", None
|
||||
|
||||
if isinstance(song_info, dict):
|
||||
duration: int = song_info["duration"]
|
||||
artist: str = song_info["channel"]
|
||||
thumb = await aio.in_memory_dl(song_info["thumbnail"])
|
||||
|
||||
down_path: list = glob.glob(os.path.join(download_path, "*"))
|
||||
|
||||
if not down_path:
|
||||
await response.edit("Song Not found.")
|
||||
return
|
||||
|
||||
await response.edit("Uploading....")
|
||||
|
||||
for audio_file in down_path:
|
||||
if audio_file.endswith((".opus", ".mp3")):
|
||||
await message.reply_audio(
|
||||
@@ -70,25 +66,39 @@ async def song_dl(bot: bot, message: Message) -> None | Message:
|
||||
performer=str(artist),
|
||||
thumb=thumb,
|
||||
)
|
||||
|
||||
await response.delete()
|
||||
|
||||
shutil.rmtree(download_path, ignore_errors=True)
|
||||
|
||||
|
||||
async def get_download_info(query: str, path: str, audio_format: str) -> dict | None:
|
||||
yt_opts = {
|
||||
"logger": FakeLogger(),
|
||||
"outtmpl": os.path.join(path, "%(title)s.%(ext)s"),
|
||||
"format": "bestaudio",
|
||||
"postprocessors": [
|
||||
{"key": "FFmpegExtractAudio", "preferredcodec": audio_format},
|
||||
{"key": "FFmpegMetadata"},
|
||||
{"key": "EmbedThumbnail"},
|
||||
],
|
||||
}
|
||||
ytdl: yt_dlp.YoutubeDL = yt_dlp.YoutubeDL(yt_opts)
|
||||
async def get_download_info(query: str, path: str):
|
||||
download_cmd = (
|
||||
f"yt-dlp -o '{os.path.join(path, '%(title)s.%(ext)s')}' "
|
||||
f"-f 'bestaudio' "
|
||||
f"--no-warnings "
|
||||
f"--ignore-errors "
|
||||
f"--ignore-no-formats-error "
|
||||
f"--quiet "
|
||||
f"--no-playlist "
|
||||
f"--audio-quality 0 "
|
||||
f"--extract-audio "
|
||||
f"--embed-thumbnail "
|
||||
f"--embed-metadata "
|
||||
f"--print-json "
|
||||
f"'{query}'"
|
||||
)
|
||||
try:
|
||||
|
||||
async with asyncio.timeout(30):
|
||||
yt_info: dict = await asyncio.to_thread(ytdl.extract_info, query)
|
||||
return yt_info
|
||||
|
||||
song_info = (await run_shell_cmd(download_cmd)).strip()
|
||||
|
||||
serialised_json = json.loads(song_info)
|
||||
return serialised_json
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
shutil.rmtree(path=path, ignore_errors=True)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user