api/stream: standardize stream types & clean up related functions

This commit is contained in:
wukko
2024-08-22 17:37:31 +06:00
parent 1064be6a7a
commit facf7741ce
11 changed files with 134 additions and 126 deletions

View File

@@ -1,5 +1,5 @@
import { createInternalStream } from './manage.js';
import HLS from 'hls-parser';
import HLS from "hls-parser";
import { createInternalStream } from "./manage.js";
function getURL(url) {
try {

View File

@@ -1,7 +1,7 @@
import { request } from 'undici';
import { Readable } from 'node:stream';
import { closeRequest, getHeaders, pipe } from './shared.js';
import { handleHlsPlaylist, isHlsRequest } from './internal-hls.js';
import { request } from "undici";
import { Readable } from "node:stream";
import { closeRequest, getHeaders, pipe } from "./shared.js";
import { handleHlsPlaylist, isHlsRequest } from "./internal-hls.js";
const CHUNK_SIZE = BigInt(8e6); // 8 MB
const min = (a, b) => a < b ? a : b;

View File

@@ -1,12 +1,13 @@
import NodeCache from "node-cache";
import { randomBytes } from "crypto";
import { nanoid } from "nanoid";
import { randomBytes } from "crypto";
import { strict as assert } from "assert";
import { setMaxListeners } from "node:events";
import { decryptStream, encryptStream, generateHmac } from "../misc/crypto.js";
import { env } from "../config.js";
import { strict as assert } from "assert";
import { closeRequest } from "./shared.js";
import { decryptStream, encryptStream, generateHmac } from "../misc/crypto.js";
// optional dependency
const freebind = env.freebindCIDR && await import('freebind').catch(() => {});
@@ -37,10 +38,8 @@ export function createStream(obj) {
service: obj.service,
filename: obj.filename,
audioFormat: obj.audioFormat,
isAudioOnly: !!obj.isAudioOnly,
headers: obj.headers,
copy: !!obj.copy,
mute: !!obj.mute,
metadata: obj.fileMetadata || false,
requestIP: obj.requestIP
};

View File

@@ -1,31 +1,33 @@
import { streamAudioOnly, streamDefault, streamLiveRender, streamVideoOnly, convertToGif } from "./types.js";
import { internalStream } from './internal.js';
import stream from "./types.js";
import { closeResponse } from "./shared.js";
import { internalStream } from "./internal.js";
export default async function(res, streamInfo) {
try {
if (streamInfo.isAudioOnly && streamInfo.type !== "proxy") {
streamAudioOnly(streamInfo, res);
return;
}
switch (streamInfo.type) {
case "proxy":
return await stream.proxy(streamInfo, res);
case "internal":
return await internalStream(streamInfo, res);
case "render":
await streamLiveRender(streamInfo, res);
break;
case "gif":
convertToGif(streamInfo, res);
break;
return internalStream(streamInfo, res);
case "merge":
return stream.merge(streamInfo, res);
case "remux":
case "mute":
streamVideoOnly(streamInfo, res);
break;
default:
await streamDefault(streamInfo, res);
break;
return stream.remux(streamInfo, res);
case "audio":
return stream.convertAudio(streamInfo, res);
case "gif":
return stream.convertGif(streamInfo, res);
}
closeResponse(res);
} catch {
closeResponse(res)
closeResponse(res);
}
}

View File

@@ -9,30 +9,29 @@ import { destroyInternalStream } from "./manage.js";
import { hlsExceptions } from "../processing/service-config.js";
import { getHeaders, closeRequest, closeResponse, pipe } from "./shared.js";
function toRawHeaders(headers) {
const toRawHeaders = (headers) => {
return Object.entries(headers)
.map(([key, value]) => `${key}: ${value}\r\n`)
.join('');
}
function killProcess(p) {
// ask the process to terminate itself gracefully
p?.kill('SIGTERM');
const killProcess = (p) => {
p?.kill('SIGTERM'); // ask the process to terminate itself gracefully
setTimeout(() => {
if (p?.exitCode === null)
// brutally murder the process if it didn't quit
p?.kill('SIGKILL');
p?.kill('SIGKILL'); // brutally murder the process if it didn't quit
}, 5000);
}
function getCommand(args) {
const getCommand = (args) => {
if (typeof env.processingPriority === 'number' && !isNaN(env.processingPriority)) {
return ['nice', ['-n', env.processingPriority.toString(), ffmpeg, ...args]]
}
return [ffmpeg, args]
}
export async function streamDefault(streamInfo, res) {
const proxy = async (streamInfo, res) => {
const abortController = new AbortController();
const shutdown = () => (
closeRequest(abortController),
@@ -42,7 +41,7 @@ export async function streamDefault(streamInfo, res) {
try {
let filename = streamInfo.filename;
if (streamInfo.isAudioOnly) {
if (streamInfo.audioFormat) {
filename = `${streamInfo.filename}.${streamInfo.audioFormat}`
}
@@ -67,7 +66,7 @@ export async function streamDefault(streamInfo, res) {
}
}
export function streamLiveRender(streamInfo, res) {
const merge = (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
@@ -127,61 +126,7 @@ export function streamLiveRender(streamInfo, res) {
}
}
export function streamAudioOnly(streamInfo, res) {
let process;
const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try {
let args = [
'-loglevel', '-8',
'-headers', toRawHeaders(getHeaders(streamInfo.service)),
]
if (streamInfo.service === "twitter") {
args.push('-seekable', '0');
}
args.push(
'-i', streamInfo.urls,
'-vn'
)
if (streamInfo.metadata) {
args = args.concat(metadataManager(streamInfo.metadata))
}
args = args.concat(ffmpegArgs[streamInfo.copy ? 'copy' : 'audio']);
if (ffmpegArgs[streamInfo.audioFormat]) {
args = args.concat(ffmpegArgs[streamInfo.audioFormat])
}
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
pipe(muxOutput, res, shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}
export function streamVideoOnly(streamInfo, res) {
const remux = (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
@@ -204,7 +149,7 @@ export function streamVideoOnly(streamInfo, res) {
'-c', 'copy'
)
if (streamInfo.mute) {
if (streamInfo.type === "mute") {
args.push('-an')
}
@@ -241,7 +186,64 @@ export function streamVideoOnly(streamInfo, res) {
}
}
export function convertToGif(streamInfo, res) {
const convertAudio = (streamInfo, res) => {
let process;
const shutdown = () => (
killProcess(process),
closeResponse(res),
destroyInternalStream(streamInfo.urls)
);
try {
let args = [
'-loglevel', '-8',
'-headers', toRawHeaders(getHeaders(streamInfo.service)),
]
if (streamInfo.service === "twitter") {
args.push('-seekable', '0');
}
args.push(
'-i', streamInfo.urls,
'-vn'
)
if (streamInfo.metadata) {
args = args.concat(metadataManager(streamInfo.metadata))
}
args = args.concat(
streamInfo.copy ? ["-c:a", "copy"] : ffmpegArgs.audio
);
if (ffmpegArgs[streamInfo.audioFormat]) {
args = args.concat(ffmpegArgs[streamInfo.audioFormat])
}
args.push('-f', streamInfo.audioFormat === "m4a" ? "ipod" : streamInfo.audioFormat, 'pipe:3');
process = spawn(...getCommand(args), {
windowsHide: true,
stdio: [
'inherit', 'inherit', 'inherit',
'pipe'
],
});
const [,,, muxOutput] = process.stdio;
res.setHeader('Connection', 'keep-alive');
res.setHeader('Content-Disposition', contentDisposition(`${streamInfo.filename}.${streamInfo.audioFormat}`));
pipe(muxOutput, res, shutdown);
res.on('finish', shutdown);
} catch {
shutdown();
}
}
const convertGif = (streamInfo, res) => {
let process;
const shutdown = () => (killProcess(process), closeResponse(res));
@@ -279,3 +281,11 @@ export function convertToGif(streamInfo, res) {
shutdown();
}
}
export default {
proxy,
merge,
remux,
convertAudio,
convertGif,
}