api: flatten code directories, better filenames, remove old files

This commit is contained in:
wukko
2024-08-03 14:47:13 +06:00
parent 5ce208f1a5
commit dd831e13e8
53 changed files with 83 additions and 249 deletions

View File

@@ -0,0 +1,71 @@
import { createInternalStream } from './manage.js';
import HLS from 'hls-parser';
function getURL(url) {
try {
return new URL(url);
} catch {
return null;
}
}
function transformObject(streamInfo, hlsObject) {
if (hlsObject === undefined) {
return (object) => transformObject(streamInfo, object);
}
let fullUrl;
if (getURL(hlsObject.uri)) {
fullUrl = hlsObject.uri;
} else {
fullUrl = new URL(hlsObject.uri, streamInfo.url);
}
hlsObject.uri = createInternalStream(fullUrl.toString(), streamInfo);
if (hlsObject.map) {
hlsObject.map = transformObject(streamInfo, hlsObject.map);
}
return hlsObject;
}
function transformMasterPlaylist(streamInfo, hlsPlaylist) {
const makeInternalStream = transformObject(streamInfo);
const makeInternalVariants = (variant) => {
variant = transformObject(streamInfo, variant);
variant.video = variant.video.map(makeInternalStream);
variant.audio = variant.audio.map(makeInternalStream);
return variant;
};
hlsPlaylist.variants = hlsPlaylist.variants.map(makeInternalVariants);
return hlsPlaylist;
}
function transformMediaPlaylist(streamInfo, hlsPlaylist) {
const makeInternalSegments = transformObject(streamInfo);
hlsPlaylist.segments = hlsPlaylist.segments.map(makeInternalSegments);
hlsPlaylist.prefetchSegments = hlsPlaylist.prefetchSegments.map(makeInternalSegments);
return hlsPlaylist;
}
const HLS_MIME_TYPES = ["application/vnd.apple.mpegurl", "audio/mpegurl", "application/x-mpegURL"];
export function isHlsRequest (req) {
return HLS_MIME_TYPES.includes(req.headers['content-type']);
}
export async function handleHlsPlaylist(streamInfo, req, res) {
let hlsPlaylist = await req.body.text();
hlsPlaylist = HLS.parse(hlsPlaylist);
hlsPlaylist = hlsPlaylist.isMasterPlaylist
? transformMasterPlaylist(streamInfo, hlsPlaylist)
: transformMediaPlaylist(streamInfo, hlsPlaylist);
hlsPlaylist = HLS.stringify(hlsPlaylist);
res.send(hlsPlaylist);
}

122
api/src/stream/internal.js Normal file
View File

@@ -0,0 +1,122 @@
import { request } from 'undici';
import { Readable } from 'node:stream';
import { closeRequest, getHeaders, pipe } from './shared.js';
import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js';
const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;
async function* readChunks(streamInfo, size) {
let read = 0n;
while (read < size) {
if (streamInfo.controller.signal.aborted) {
throw new Error("controller aborted");
}
const chunk = await request(streamInfo.url, {
headers: {
...getHeaders('youtube'),
Range: `bytes=${read}-${read + CHUNK_SIZE}`
},
dispatcher: streamInfo.dispatcher,
signal: streamInfo.controller.signal
});
const expected = min(CHUNK_SIZE, size - read);
const received = BigInt(chunk.headers['content-length']);
if (received < expected / 2n) {
closeRequest(streamInfo.controller);
}
for await (const data of chunk.body) {
yield data;
}
read += received;
}
}
async function handleYoutubeStream(streamInfo, res) {
const { signal } = streamInfo.controller;
const cleanup = () => (res.end(), closeRequest(streamInfo.controller));
try {
const req = await fetch(streamInfo.url, {
headers: getHeaders('youtube'),
method: 'HEAD',
dispatcher: streamInfo.dispatcher,
signal
});
streamInfo.url = req.url;
const size = BigInt(req.headers.get('content-length'));
if (req.status !== 200 || !size) {
return cleanup();
}
const generator = readChunks(streamInfo, size);
const abortGenerator = () => {
generator.return();
signal.removeEventListener('abort', abortGenerator);
}
signal.addEventListener('abort', abortGenerator);
const stream = Readable.from(generator);
for (const headerName of ['content-type', 'content-length']) {
const headerValue = req.headers.get(headerName);
if (headerValue) res.setHeader(headerName, headerValue);
}
pipe(stream, res, cleanup);
} catch {
cleanup();
}
}
async function handleGenericStream(streamInfo, res) {
const { signal } = streamInfo.controller;
const cleanup = () => res.end();
try {
const req = await request(streamInfo.url, {
headers: {
...Object.fromEntries(streamInfo.headers),
host: undefined
},
dispatcher: streamInfo.dispatcher,
signal,
maxRedirections: 16
});
res.status(req.statusCode);
req.body.on('error', () => {});
for (const [ name, value ] of Object.entries(req.headers))
res.setHeader(name, value)
if (req.statusCode < 200 || req.statusCode > 299)
return cleanup();
if (isHlsRequest(req)) {
await handleHlsPlaylist(streamInfo, req, res);
} else {
pipe(req.body, res, cleanup);
}
} catch {
closeRequest(streamInfo.controller);
cleanup();
}
}
export function internalStream(streamInfo, res) {
if (streamInfo.service === 'youtube') {
return handleYoutubeStream(streamInfo, res);
}
return handleGenericStream(streamInfo, res);
}

166
api/src/stream/manage.js Normal file
View File

@@ -0,0 +1,166 @@
import NodeCache from "node-cache";
import { randomBytes } from "crypto";
import { nanoid } from "nanoid";
import { setMaxListeners } from "node:events";
import { decryptStream, encryptStream, generateHmac } from "../misc/crypto.js";
import { env } from "../config.js";
import { strict as assert } from "assert";
import { closeRequest } from "./shared.js";
// optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
const streamCache = new NodeCache({
stdTTL: env.streamLifespan,
checkperiod: 10,
deleteOnExpire: true
})
streamCache.on("expired", (key) => {
streamCache.del(key);
})
const internalStreamCache = {};
const hmacSalt = randomBytes(64).toString('hex');
export function createStream(obj) {
const streamID = nanoid(),
iv = randomBytes(16).toString('base64url'),
secret = randomBytes(32).toString('base64url'),
exp = new Date().getTime() + env.streamLifespan * 1000,
hmac = generateHmac(`${streamID},${exp},${iv},${secret}`, hmacSalt),
streamData = {
exp: exp,
type: obj.type,
urls: obj.u,
service: obj.service,
filename: obj.filename,
audioFormat: obj.audioFormat,
isAudioOnly: !!obj.isAudioOnly,
headers: obj.headers,
copy: !!obj.copy,
mute: !!obj.mute,
metadata: obj.fileMetadata || false,
requestIP: obj.requestIP
};
streamCache.set(
streamID,
encryptStream(streamData, iv, secret)
)
let streamLink = new URL('/api/stream', env.apiURL);
const params = {
'id': streamID,
'exp': exp,
'sig': hmac,
'sec': secret,
'iv': iv
}
for (const [key, value] of Object.entries(params)) {
streamLink.searchParams.append(key, value);
}
return streamLink.toString();
}
export function getInternalStream(id) {
return internalStreamCache[id];
}
export function createInternalStream(url, obj = {}) {
assert(typeof url === 'string');
let dispatcher;
if (obj.requestIP) {
dispatcher = freebind?.dispatcherFromIP(obj.requestIP, { strict: false })
}
const streamID = nanoid();
let controller = obj.controller;
if (!controller) {
controller = new AbortController();
setMaxListeners(Infinity, controller.signal);
}
let headers;
if (obj.headers) {
headers = new Map(Object.entries(obj.headers));
}
internalStreamCache[streamID] = {
url,
service: obj.service,
headers,
controller,
dispatcher
};
let streamLink = new URL('/api/istream', `http://127.0.0.1:${env.apiPort}`);
streamLink.searchParams.set('id', streamID);
const cleanup = () => {
destroyInternalStream(streamLink);
controller.signal.removeEventListener('abort', cleanup);
}
controller.signal.addEventListener('abort', cleanup);
return streamLink.toString();
}
export function destroyInternalStream(url) {
url = new URL(url);
if (url.hostname !== '127.0.0.1') {
return;
}
const id = url.searchParams.get('id');
if (internalStreamCache[id]) {
closeRequest(internalStreamCache[id].controller);
delete internalStreamCache[id];
}
}
function wrapStream(streamInfo) {
const url = streamInfo.urls;
if (typeof url === 'string') {
streamInfo.urls = createInternalStream(url, streamInfo);
} else if (Array.isArray(url)) {
for (const idx in streamInfo.urls) {
streamInfo.urls[idx] = createInternalStream(
streamInfo.urls[idx], streamInfo
);
}
} else throw 'invalid urls';
return streamInfo;
}
export function verifyStream(id, hmac, exp, secret, iv) {
try {
const ghmac = generateHmac(`${id},${exp},${iv},${secret}`, hmacSalt);
const cache = streamCache.get(id.toString());
if (ghmac !== String(hmac)) return { status: 401 };
if (!cache) return { status: 404 };
const streamInfo = JSON.parse(decryptStream(cache, iv, secret));
if (!streamInfo) return { status: 404 };
if (Number(exp) <= new Date().getTime())
return { status: 404 };
return wrapStream(streamInfo);
}
catch {
return { status: 500 };
}
}

45
api/src/stream/shared.js Normal file
View File

@@ -0,0 +1,45 @@
import { genericUserAgent } from "../config.js";
const defaultHeaders = {
'user-agent': genericUserAgent
}
const serviceHeaders = {
bilibili: {
referer: 'https://www.bilibili.com/'
},
youtube: {
accept: '*/*',
origin: 'https://www.youtube.com',
referer: 'https://www.youtube.com',
DNT: '?1'
}
}
export function closeRequest(controller) {
try { controller.abort() } catch {}
}
export function closeResponse(res) {
if (!res.headersSent) {
res.sendStatus(500);
}
return res.end();
}
export function getHeaders(service) {
// Converting all header values to strings
return Object.entries({ ...defaultHeaders, ...serviceHeaders[service] })
.reduce((p, [key, val]) => ({ ...p, [key]: String(val) }), {})
}
export function pipe(from, to, done) {
from.on('error', done)
.on('close', done);
to.on('error', done)
.on('close', done);
from.pipe(to);
}

31
api/src/stream/stream.js Normal file
View File

@@ -0,0 +1,31 @@
import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js";
import { internalStream } from './internal.js';
import { closeResponse } from "./shared.js";
export default async function(res, streamInfo) {
try {
if (streamInfo.isAudioOnly && streamInfo.type !== "bridge") {
streamAudioOnly(streamInfo, res);
return;
}
switch (streamInfo.type) {
case "internal":
return await internalStream(streamInfo, res);
case "render":
await streamLiveRender(streamInfo, res);
break;
case "gif":
convertToGif(streamInfo, res);
break;
case "remux":
case "mute":
streamVideoOnly(streamInfo, res);
break;
default:
await streamDefault(streamInfo, res);
break;
}
} catch {
closeResponse(res)
}
}

278
api/src/stream/types.js Normal file
View File

@@ -0,0 +1,278 @@
import { request } from "undici";
import ffmpeg from "ffmpeg-static";
import { spawn } from "child_process";
import { create as contentDisposition } from "content-disposition-header";
import { metadataManager } from "../misc/utils.js";
import { destroyInternalStream } from "./manage.js";
import { env, ffmpegArgs, hlsExceptions } from "../config.js";
import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js";
function toRawHeaders(headers) {
return Object.entries(headers)
.map(([key, value]) => `${key}: ${value}\r\n`)
.join('');
}
function killProcess(p) {
// ask the process to terminate itself gracefully
p?.kill('SIGTERM');
setTimeout(() => {
if (p?.exitCode === null)
// brutally murder the process if it didn't quit
p?.kill('SIGKILL');
}, 5000);
}
function getCommand(args) {
if (typeof env.processingPriority === 'number' && !isNaN(env.processingPriority)) {
return ['nice', ['-n', env.processingPriority.toString(), ffmpeg, ...args]]
}
return [ffmpeg, args]
}
export async function streamDefault(streamInfo, res) {
const abortController = new AbortController();
const shutdown = () => (
closeRequest(abortController),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try {
let filename = streamInfo.filename;
if (streamInfo.isAudioOnly) {
filename = `${streamInfo.filename}.${streamInfo.audioFormat}`
}
res.setHeader('Content-disposition', contentDisposition(filename));
const { body: stream, headers } = await request(streamInfo.urls, {
headers: getHeaders(streamInfo.service),
signal: abortController.signal,
maxRedirections: 16
});
for (const headerName of ['content-type', 'content-length']) {
if (headers[headerName]) {
res.setHeader(headerName, headers[headerName]);
}
}
pipe(stream, res, shutdown);
} catch {
shutdown();
}
}
export function streamLiveRender(streamInfo, res) {
let process;
const shutdown = () => (
killProcess(process),
closeResponse(res),
streamInfo.urls.map(destroyInternalStream)
);
const headers = getHeaders(streamInfo.service);
const rawHeaders = toRawHeaders(headers);
try {
if (streamInfo.urls.length !== 2) return shutdown();
const format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
let args = [
'-loglevel', '-8',
'-headers', rawHeaders,
'-i', streamInfo.urls[0],
'-headers', rawHeaders,
'-i', streamInfo.urls[1],
'-map', '0:v',
'-map', '1:a',
]
args = args.concat(ffmpegArgs[format]);
if (hlsExceptions.includes(streamInfo.service)) {
args.push('-bsf:a', 'aac_adtstoasc')
}
if (streamInfo.metadata) {
args = args.concat(metadataManager(streamInfo.metadata))
}
args.push('-f', format, 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
pipe(muxOutput, res, shutdown);
process.on('close', shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}
export function streamAudioOnly(streamInfo, res) {
let process;
const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try {
let args = [
'-loglevel', '-8',
'-headers', toRawHeaders(getHeaders(streamInfo.service)),
]
if (streamInfo.service === "twitter") {
args.push('-seekable', '0');
}
args.push(
'-i', streamInfo.urls,
'-vn'
)
if (streamInfo.metadata) {
args = args.concat(metadataManager(streamInfo.metadata))
}
args = args.concat(ffmpegArgs[streamInfo.copy ? 'copy' : 'audio']);
if (ffmpegArgs[streamInfo.audioFormat]) {
args = args.concat(ffmpegArgs[streamInfo.audioFormat])
}
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
pipe(muxOutput, res, shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}
export function streamVideoOnly(streamInfo, res) {
let process;
const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try {
let args = [
'-loglevel', '-8',
'-headers', toRawHeaders(getHeaders(streamInfo.service)),
]
if (streamInfo.service === "twitter") {
args.push('-seekable', '0')
}
args.push(
'-i', streamInfo.urls,
'-c', 'copy'
)
if (streamInfo.mute) {
args.push('-an')
}
if (hlsExceptions.includes(streamInfo.service)) {
args.push('-bsf:a', 'aac_adtstoasc')
}
let format = streamInfo.filename.split('.')[streamInfo.filename.split('.').length - 1];
if (format === "mp4") {
args.push('-movflags', 'faststart+frag_keyframe+empty_moov')
}
args.push('-f', format, 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename));
pipe(muxOutput, res, shutdown);
process.on('close', shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}
export function convertToGif(streamInfo, res) {
let process;
const shutdown = () => (killProcess(process), closeResponse(res));
try {
let args = [
'-loglevel', '-8'
]
if (streamInfo.service === "twitter") {
args.push('-seekable', '0')
}
args.push('-i', streamInfo.urls);
args = args.concat(ffmpegArgs["gif"]);
args.push('-f', "gif", 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(streamInfo.filename.split('.')[0] + ".gif"));
pipe(muxOutput, res, shutdown);
process.on('close', shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}