web: core system for queue & queen bee, move remux to new system
it's 3 am and i think i had a divine intervention
This commit is contained in:
36
web/src/lib/queen-bee/queue.ts
Normal file
36
web/src/lib/queen-bee/queue.ts
Normal file
@@ -0,0 +1,36 @@
|
||||
import { addItem } from "$lib/state/queen-bee/queue";
|
||||
import type { CobaltPipelineItem } from "$lib/types/workers";
|
||||
|
||||
export const getMediaType = (type: string) => {
|
||||
const kind = type.split('/')[0];
|
||||
|
||||
// can't use .includes() here for some reason
|
||||
if (kind === "video" || kind === "audio" || kind === "image") {
|
||||
return kind;
|
||||
}
|
||||
}
|
||||
|
||||
export const createRemuxPipeline = (file: File) => {
|
||||
// chopped khia
|
||||
const parentId = crypto.randomUUID();
|
||||
const mediaType = getMediaType(file.type);
|
||||
|
||||
const pipeline: CobaltPipelineItem[] = [{
|
||||
worker: "remux",
|
||||
workerId: crypto.randomUUID(),
|
||||
parentId,
|
||||
workerArgs: {
|
||||
files: [file],
|
||||
},
|
||||
}];
|
||||
|
||||
if (mediaType) {
|
||||
addItem({
|
||||
id: parentId,
|
||||
state: "waiting",
|
||||
pipeline,
|
||||
filename: file.name,
|
||||
mediaType,
|
||||
})
|
||||
}
|
||||
}
|
||||
60
web/src/lib/queen-bee/run-worker.ts
Normal file
60
web/src/lib/queen-bee/run-worker.ts
Normal file
@@ -0,0 +1,60 @@
|
||||
import RemuxWorker from "$lib/workers/remux?worker";
|
||||
//import RemoveBgWorker from "$lib/workers/removebg?worker";
|
||||
|
||||
import type { CobaltPipelineItem } from "$lib/types/workers";
|
||||
import { itemDone, itemError } from "$lib/state/queen-bee/queue";
|
||||
|
||||
const workerError = (parentId: string, workerId: string, worker: Worker, error: string) => {
|
||||
itemError(parentId, workerId, error);
|
||||
worker.terminate();
|
||||
}
|
||||
|
||||
const workerSuccess = (parentId: string, workerId: string, worker: Worker, file: File) => {
|
||||
itemDone(parentId, workerId, file);
|
||||
worker.terminate();
|
||||
}
|
||||
|
||||
export const runRemuxWorker = async (workerId: string, parentId: string, file: File) => {
|
||||
const worker = new RemuxWorker();
|
||||
|
||||
worker.postMessage({ file });
|
||||
|
||||
worker.onerror = (e) => {
|
||||
console.error("remux worker exploded:", e);
|
||||
|
||||
// TODO: proper error code
|
||||
workerError(parentId, workerId, worker, "internal error");
|
||||
};
|
||||
|
||||
worker.onmessage = (event) => {
|
||||
const eventData = event.data.cobaltRemuxWorker;
|
||||
if (!eventData) return;
|
||||
|
||||
console.log(eventData);
|
||||
|
||||
// TODO: calculate & use progress again
|
||||
|
||||
if (eventData.render) {
|
||||
return workerSuccess(
|
||||
parentId,
|
||||
workerId,
|
||||
worker,
|
||||
new File([eventData.render], eventData.filename, {
|
||||
type: eventData.render.type,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
if (eventData.error) {
|
||||
return workerError(parentId, workerId, worker, eventData.error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export const startWorker = async ({ worker, workerId, parentId, workerArgs }: CobaltPipelineItem) => {
|
||||
switch (worker) {
|
||||
case "remux":
|
||||
await runRemuxWorker(workerId, parentId, workerArgs.files[0]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
41
web/src/lib/queen-bee/scheduler.ts
Normal file
41
web/src/lib/queen-bee/scheduler.ts
Normal file
@@ -0,0 +1,41 @@
|
||||
import { get } from "svelte/store";
|
||||
import { itemRunning, queue } from "$lib/state/queen-bee/queue";
|
||||
import { startWorker } from "$lib/queen-bee/run-worker";
|
||||
import { addWorkerToQueue, currentTasks } from "$lib/state/queen-bee/current-tasks";
|
||||
|
||||
export const checkTasks = () => {
|
||||
const queueItems = get(queue);
|
||||
const ongoingTasks = get(currentTasks)
|
||||
|
||||
if (Object.keys(ongoingTasks).length > 0) return;
|
||||
|
||||
for (const item of Object.keys(queueItems)) {
|
||||
const task = queueItems[item];
|
||||
|
||||
if (task.state === "running") {
|
||||
break;
|
||||
}
|
||||
|
||||
if (task.state === "waiting") {
|
||||
for (let i = 0; i < task.pipeline.length; i++) {
|
||||
// TODO: loop here and pass the file between pipelines
|
||||
// or schedule several tasks one after another but within
|
||||
// one parent & pipeline
|
||||
const pipelineItem = task.pipeline[i];
|
||||
|
||||
startWorker(pipelineItem);
|
||||
|
||||
addWorkerToQueue({
|
||||
id: pipelineItem.workerId,
|
||||
parentId: task.id,
|
||||
step: i + 1,
|
||||
totalSteps: task.pipeline.length,
|
||||
});
|
||||
|
||||
itemRunning(task.id, i);
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
40
web/src/lib/state/queen-bee/current-tasks.ts
Normal file
40
web/src/lib/state/queen-bee/current-tasks.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import { readable, type Updater } from "svelte/store";
|
||||
|
||||
import type { CobaltWorkerProgress } from "$lib/types/workers";
|
||||
import type { CobaltCurrentTasks, CobaltCurrentTaskItem } from "$lib/types/queen-bee";
|
||||
|
||||
let update: (_: Updater<CobaltCurrentTasks>) => void;
|
||||
|
||||
const currentTasks = readable<CobaltCurrentTasks>(
|
||||
{},
|
||||
(_, _update) => { update = _update }
|
||||
);
|
||||
|
||||
export function addWorkerToQueue(item: CobaltCurrentTaskItem) {
|
||||
update(tasks => {
|
||||
tasks[item.id] = item;
|
||||
return tasks;
|
||||
});
|
||||
}
|
||||
|
||||
export function removeWorkerFromQueue(id: string) {
|
||||
update(tasks => {
|
||||
delete tasks[id];
|
||||
return tasks;
|
||||
});
|
||||
}
|
||||
|
||||
export function updateWorkerProgress(id: string, progress: CobaltWorkerProgress) {
|
||||
update(tasks => {
|
||||
tasks[id].progress = progress;
|
||||
return tasks;
|
||||
});
|
||||
}
|
||||
|
||||
export function clearQueue() {
|
||||
update(() => {
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
export { currentTasks };
|
||||
84
web/src/lib/state/queen-bee/queue.ts
Normal file
84
web/src/lib/state/queen-bee/queue.ts
Normal file
@@ -0,0 +1,84 @@
|
||||
import { readable, type Updater } from "svelte/store";
|
||||
import type { CobaltQueue, CobaltQueueItem } from "$lib/types/queue";
|
||||
import { checkTasks } from "$lib/queen-bee/scheduler";
|
||||
import { removeWorkerFromQueue } from "./current-tasks";
|
||||
|
||||
let update: (_: Updater<CobaltQueue>) => void;
|
||||
|
||||
const queue = readable<CobaltQueue>(
|
||||
{},
|
||||
(_, _update) => { update = _update }
|
||||
);
|
||||
|
||||
export function addItem(item: CobaltQueueItem) {
|
||||
update(queueData => {
|
||||
queueData[item.id] = item;
|
||||
return queueData;
|
||||
});
|
||||
|
||||
checkTasks();
|
||||
}
|
||||
|
||||
export function itemError(id: string, workerId: string, error: string) {
|
||||
update(queueData => {
|
||||
if (queueData[id]) {
|
||||
queueData[id] = {
|
||||
...queueData[id],
|
||||
state: "error",
|
||||
errorCode: error,
|
||||
}
|
||||
}
|
||||
return queueData;
|
||||
});
|
||||
|
||||
removeWorkerFromQueue(workerId);
|
||||
checkTasks();
|
||||
}
|
||||
|
||||
export function itemDone(id: string, workerId: string, file: File) {
|
||||
update(queueData => {
|
||||
if (queueData[id]) {
|
||||
queueData[id] = {
|
||||
...queueData[id],
|
||||
state: "done",
|
||||
resultFile: file,
|
||||
}
|
||||
}
|
||||
return queueData;
|
||||
});
|
||||
|
||||
removeWorkerFromQueue(workerId);
|
||||
checkTasks();
|
||||
}
|
||||
|
||||
export function itemRunning(id: string, step: number) {
|
||||
update(queueData => {
|
||||
if (queueData[id]) {
|
||||
queueData[id] = {
|
||||
...queueData[id],
|
||||
state: "running",
|
||||
currentStep: step,
|
||||
}
|
||||
}
|
||||
return queueData;
|
||||
});
|
||||
|
||||
checkTasks();
|
||||
}
|
||||
|
||||
export function removeItem(id: string) {
|
||||
update(queueData => {
|
||||
delete queueData[id];
|
||||
return queueData;
|
||||
});
|
||||
|
||||
checkTasks();
|
||||
}
|
||||
|
||||
export function clearQueue() {
|
||||
update(() => {
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
export { queue };
|
||||
@@ -1,70 +0,0 @@
|
||||
import { merge } from "ts-deepmerge";
|
||||
import { get, readable, type Updater } from "svelte/store";
|
||||
import type { OngoingQueueItem, QueueItem } from "$lib/types/queue";
|
||||
|
||||
type Queue = {
|
||||
[id: string]: QueueItem;
|
||||
}
|
||||
|
||||
type OngoingQueue = {
|
||||
[id: string]: OngoingQueueItem;
|
||||
}
|
||||
|
||||
let update: (_: Updater<Queue>) => void;
|
||||
|
||||
const queue = readable<Queue>(
|
||||
{},
|
||||
(_, _update) => { update = _update }
|
||||
);
|
||||
|
||||
export function addToQueue(item: QueueItem) {
|
||||
update(queueData => {
|
||||
queueData[item.id] = item;
|
||||
return queueData;
|
||||
});
|
||||
}
|
||||
|
||||
export function removeFromQueue(id: string) {
|
||||
update(queueData => {
|
||||
delete queueData[id];
|
||||
return queueData;
|
||||
});
|
||||
}
|
||||
|
||||
let updateOngoing: (_: Updater<OngoingQueue>) => void;
|
||||
|
||||
const ongoingQueue = readable<OngoingQueue>(
|
||||
{},
|
||||
(_, _update) => { updateOngoing = _update }
|
||||
);
|
||||
|
||||
export function updateOngoingQueue(id: string, itemInfo: Partial<OngoingQueueItem> = {}) {
|
||||
updateOngoing(queueData => {
|
||||
if (get(queue)?.id) {
|
||||
queueData[id] = merge(queueData[id], {
|
||||
id,
|
||||
...itemInfo,
|
||||
});
|
||||
}
|
||||
|
||||
return queueData;
|
||||
});
|
||||
}
|
||||
|
||||
export function removeFromOngoingQueue(id: string) {
|
||||
updateOngoing(queue => {
|
||||
delete queue[id];
|
||||
return queue;
|
||||
});
|
||||
}
|
||||
|
||||
export function nukeEntireQueue() {
|
||||
update(() => {
|
||||
return {};
|
||||
});
|
||||
updateOngoing(() => {
|
||||
return {};
|
||||
});
|
||||
}
|
||||
|
||||
export { queue, ongoingQueue };
|
||||
13
web/src/lib/types/queen-bee.ts
Normal file
13
web/src/lib/types/queen-bee.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import type { CobaltWorkerProgress } from "$lib/types/workers";
|
||||
|
||||
export type CobaltCurrentTaskItem = {
|
||||
id: string,
|
||||
parentId: string, // parent id is queue id to which this pipeline worker belongs to
|
||||
step: number,
|
||||
totalSteps: number,
|
||||
progress?: CobaltWorkerProgress,
|
||||
}
|
||||
|
||||
export type CobaltCurrentTasks = {
|
||||
[id: string]: CobaltCurrentTaskItem,
|
||||
}
|
||||
@@ -1,34 +1,37 @@
|
||||
type ProcessingStep = "mux" | "mux_hls" | "encode";
|
||||
type ProcessingPreset = "mp4" | "webm" | "copy";
|
||||
type ProcessingState = "completed" | "failed" | "canceled" | "waiting" | "downloading" | "muxing" | "converting";
|
||||
type ProcessingType = "video" | "video_mute" | "audio" | "audio_convert" | "image" | "gif";
|
||||
type QueueFileType = "video" | "audio" | "image" | "gif";
|
||||
import type { CobaltPipelineItem, CobaltPipelineResultFileType } from "$lib/types/workers";
|
||||
|
||||
export type ProcessingStepItem = {
|
||||
type: ProcessingStep,
|
||||
preset?: ProcessingPreset,
|
||||
}
|
||||
export type CobaltQueueItemState = "waiting" | "running" | "done" | "error";
|
||||
|
||||
export type QueueFile = {
|
||||
type: QueueFileType,
|
||||
url: string,
|
||||
}
|
||||
|
||||
export type QueueItem = {
|
||||
export type CobaltQueueBaseItem = {
|
||||
id: string,
|
||||
status: ProcessingState,
|
||||
type: ProcessingType,
|
||||
state: CobaltQueueItemState,
|
||||
pipeline: CobaltPipelineItem[],
|
||||
// TODO: metadata
|
||||
filename: string,
|
||||
files: QueueFile[],
|
||||
processingSteps: ProcessingStepItem[],
|
||||
}
|
||||
mediaType: CobaltPipelineResultFileType,
|
||||
};
|
||||
|
||||
export type OngoingQueueItem = {
|
||||
id: string,
|
||||
currentStep?: ProcessingStep,
|
||||
size?: {
|
||||
expected: number,
|
||||
current: number,
|
||||
},
|
||||
speed?: number,
|
||||
}
|
||||
export type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
|
||||
state: "waiting",
|
||||
};
|
||||
|
||||
export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
|
||||
state: "running",
|
||||
currentStep: number,
|
||||
};
|
||||
|
||||
export type CobaltQueueItemDone = CobaltQueueBaseItem & {
|
||||
state: "done",
|
||||
resultFile: File,
|
||||
};
|
||||
|
||||
export type CobaltQueueItemError = CobaltQueueBaseItem & {
|
||||
state: "error",
|
||||
errorCode: string,
|
||||
};
|
||||
|
||||
export type CobaltQueueItem = CobaltQueueItemWaiting | CobaltQueueItemRunning | CobaltQueueItemDone | CobaltQueueItemError;
|
||||
|
||||
export type CobaltQueue = {
|
||||
[id: string]: CobaltQueueItem,
|
||||
};
|
||||
|
||||
22
web/src/lib/types/workers.ts
Normal file
22
web/src/lib/types/workers.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
export const resultFileTypes = ["video", "audio", "image"] as const;
|
||||
|
||||
export type CobaltWorkerType = "remux" | "removebg";
|
||||
export type CobaltPipelineResultFileType = typeof resultFileTypes[number];
|
||||
|
||||
export type CobaltWorkerProgress = {
|
||||
indeterminate: boolean,
|
||||
speed?: number,
|
||||
percentage: number,
|
||||
}
|
||||
|
||||
export type CobaltWorkerArgs = {
|
||||
files: File[],
|
||||
//TODO: args for libav & etc with unique types
|
||||
}
|
||||
|
||||
export type CobaltPipelineItem = {
|
||||
worker: CobaltWorkerType,
|
||||
workerId: string,
|
||||
parentId: string,
|
||||
workerArgs: CobaltWorkerArgs,
|
||||
}
|
||||
Reference in New Issue
Block a user