web/queue: add remuxing progress & general improvements

and a bunch of other stuff:
- size and percentage in queue
- indeterminate progress bar
- if libav wasm freezes, the worker kill itself
- cleaner states
- cleaner props
This commit is contained in:
wukko
2025-01-25 01:25:53 +06:00
parent c4c47bdc27
commit 44a99bdb3a
12 changed files with 117 additions and 53 deletions

View File

@@ -38,7 +38,7 @@ export default class LibAVWrapper {
try {
await libav.ffprobe([
//'-v', 'quiet',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',

View File

@@ -1,8 +1,8 @@
import RemuxWorker from "$lib/workers/remux?worker";
//import RemoveBgWorker from "$lib/workers/removebg?worker";
import type { CobaltPipelineItem } from "$lib/types/workers";
import { itemDone, itemError } from "$lib/state/queen-bee/queue";
import { updateWorkerProgress } from "$lib/state/queen-bee/current-tasks";
const workerError = (parentId: string, workerId: string, worker: Worker, error: string) => {
itemError(parentId, workerId, error);
@@ -26,13 +26,43 @@ export const runRemuxWorker = async (workerId: string, parentId: string, file: F
workerError(parentId, workerId, worker, "internal error");
};
// sometimes chrome refuses to start libav wasm,
// so we check the health and kill self if it doesn't spawn
let bumpAttempts = 0;
const startCheck = setInterval(() => {
bumpAttempts++;
if (bumpAttempts === 8) {
worker.terminate();
console.error("worker didn't start after 4 seconds, so it was killed");
// TODO: proper error code
return workerError(parentId, workerId, worker, "worker didn't start");
}
}, 500);
let totalDuration: number | null = null;
worker.onmessage = (event) => {
const eventData = event.data.cobaltRemuxWorker;
if (!eventData) return;
console.log(eventData);
// temporary debug logging
console.log(JSON.stringify(eventData, null, 2));
// TODO: calculate & use progress again
clearInterval(startCheck);
if (eventData.progress) {
if (eventData.progress.duration) {
totalDuration = eventData.progress.duration;
}
updateWorkerProgress(workerId, {
percentage: totalDuration ? (eventData.progress.durationProcessed / totalDuration) * 100 : 0,
size: eventData.progress.size,
})
}
if (eventData.render) {
return workerSuccess(

View File

@@ -23,16 +23,16 @@ export const checkTasks = () => {
// one parent & pipeline
const pipelineItem = task.pipeline[i];
startWorker(pipelineItem);
addWorkerToQueue({
id: pipelineItem.workerId,
addWorkerToQueue(pipelineItem.workerId, {
parentId: task.id,
step: i + 1,
totalSteps: task.pipeline.length,
});
itemRunning(task.id, i);
itemRunning(
task.id,
pipelineItem.workerId
);
startWorker(pipelineItem);
break;
}
break;

View File

@@ -10,9 +10,9 @@ const currentTasks = readable<CobaltCurrentTasks>(
(_, _update) => { update = _update }
);
export function addWorkerToQueue(item: CobaltCurrentTaskItem) {
export function addWorkerToQueue(workerId: string, item: CobaltCurrentTaskItem) {
update(tasks => {
tasks[item.id] = item;
tasks[workerId] = item;
return tasks;
});
}
@@ -24,10 +24,10 @@ export function removeWorkerFromQueue(id: string) {
});
}
export function updateWorkerProgress(id: string, progress: CobaltWorkerProgress) {
update(tasks => {
tasks[id].progress = progress;
return tasks;
export function updateWorkerProgress(workerId: string, progress: CobaltWorkerProgress) {
update(allTasks => {
allTasks[workerId].progress = progress;
return allTasks;
});
}

View File

@@ -51,13 +51,13 @@ export function itemDone(id: string, workerId: string, file: File) {
checkTasks();
}
export function itemRunning(id: string, step: number) {
export function itemRunning(id: string, workerId: string) {
update(queueData => {
if (queueData[id]) {
queueData[id] = {
...queueData[id],
state: "running",
currentStep: step,
runningWorker: workerId,
}
}
return queueData;
@@ -68,6 +68,10 @@ export function itemRunning(id: string, step: number) {
export function removeItem(id: string) {
update(queueData => {
for (const worker in queueData[id].pipeline) {
removeWorkerFromQueue(queueData[id].pipeline[worker].workerId);
}
delete queueData[id];
return queueData;
});

View File

@@ -1,10 +1,7 @@
import type { CobaltWorkerProgress } from "$lib/types/workers";
export type CobaltCurrentTaskItem = {
id: string,
parentId: string, // parent id is queue id to which this pipeline worker belongs to
step: number,
totalSteps: number,
parentId: string,
progress?: CobaltWorkerProgress,
}

View File

@@ -17,7 +17,7 @@ export type CobaltQueueItemWaiting = CobaltQueueBaseItem & {
export type CobaltQueueItemRunning = CobaltQueueBaseItem & {
state: "running",
currentStep: number,
runningWorker: string,
};
export type CobaltQueueItemDone = CobaltQueueBaseItem & {

View File

@@ -4,9 +4,9 @@ export type CobaltWorkerType = "remux" | "removebg";
export type CobaltPipelineResultFileType = typeof resultFileTypes[number];
export type CobaltWorkerProgress = {
indeterminate: boolean,
percentage?: number,
speed?: number,
percentage: number,
size: number,
}
export type CobaltWorkerArgs = {

View File

@@ -15,6 +15,9 @@ const ff = new LibAVWrapper((progress) => {
progress: {
durationProcessed: progress.out_time_sec,
speed: progress.speed,
size: progress.total_size,
currentFrame: progress.frame,
fps: progress.fps,
}
}
})
@@ -45,7 +48,7 @@ const remux = async (file: File) => {
self.postMessage({
cobaltRemuxWorker: {
progressInfo: {
progress: {
duration: Number(file_info.format.duration),
}
}