Merge pull request #11 from Ahoum-Dev/feat/worker-and-ui

feat: background worker, prompt template, section switcher, home hub
This commit is contained in:
Anurag Pappula 2026-04-23 09:43:32 +05:30 committed by GitHub
commit ac6f4efd6a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 633 additions and 38 deletions

View file

@ -32,6 +32,7 @@ COPY --from=builder /app/node_modules ./node_modules
COPY --from=builder /app/packages ./packages
COPY --from=builder /app/prisma ./prisma
COPY --from=builder /app/lib ./lib
COPY --from=builder /app/worker ./worker
COPY --from=builder /app/package.json ./
COPY --from=builder /app/next.config.mjs ./

View file

@ -1,5 +1,10 @@
import { redirect } from 'next/navigation';
import HomeHub from '@/components/HomeHub';
export const metadata = {
title: 'Open Generative AI',
description: 'Pick a workspace — Studio for one-off creations, Batch for CSV-driven automation.',
};
export default function Home() {
redirect('/studio');
return <HomeHub />;
}

138
components/HomeHub.jsx Normal file
View file

@ -0,0 +1,138 @@
'use client';
import { useEffect, useState } from 'react';
const CARDS = [
{
id: 'batch',
title: 'Batch',
tagline: 'CSV in. 255 videos out.',
description:
'Upload a CSV, map trainers and studios once, hit Start, and a background worker submits every row to MuAPI seedance-v2.0-i2v with retries, pause/resume, and per-row results.',
href: '/batch',
accent: 'bg-[#d9ff00] text-black hover:bg-[#e5ff33]',
badge: 'CSV-driven automation',
icon: (
<svg width="28" height="28" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<rect x="3" y="3" width="7" height="7" rx="1" />
<rect x="14" y="3" width="7" height="7" rx="1" />
<rect x="3" y="14" width="7" height="7" rx="1" />
<rect x="14" y="14" width="7" height="7" rx="1" />
</svg>
),
bullets: ['CSV upload + auto-mapping', 'Live progress + retry', 'Results CSV export'],
},
{
id: 'studio',
title: 'Studio',
tagline: 'One generation at a time.',
description:
'200+ MuAPI models for ad-hoc image, video, lip-sync, and cinema generations. Use this when you want to experiment with a single prompt rather than run a CSV.',
href: '/studio',
accent: 'bg-white/10 text-white hover:bg-white/20 border border-white/20',
badge: 'For one-offs and exploration',
icon: (
<svg width="28" height="28" viewBox="0 0 24 24" fill="none" stroke="currentColor" strokeWidth="2" strokeLinecap="round" strokeLinejoin="round">
<polygon points="13 2 3 14 12 14 11 22 21 10 12 10 13 2" />
</svg>
),
bullets: ['Image / Video / Lip Sync / Cinema', 'Bring-your-own MuAPI key', 'Live preview + history'],
},
];
const LAST_KEY = 'lastSection';
export default function HomeHub() {
const [lastSection, setLastSection] = useState(null);
useEffect(() => {
try {
setLastSection(localStorage.getItem(LAST_KEY));
} catch {}
}, []);
return (
<div className="min-h-screen bg-[#030303] text-white flex flex-col">
<header className="flex-shrink-0 h-14 border-b border-white/[0.03] flex items-center px-6 bg-black/20 backdrop-blur-md">
<div className="flex items-center gap-2">
<div className="w-8 h-8 bg-white rounded-lg flex items-center justify-center">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="black" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 2L2 7l10 5 10-5-10-5zM2 17l10 5 10-5M2 12l10 5 10-5" />
</svg>
</div>
<span className="text-sm font-bold tracking-tight">OpenGenerativeAI</span>
</div>
</header>
<main className="flex-1 flex items-center justify-center px-6 py-12">
<div className="max-w-5xl w-full">
<div className="text-center mb-10">
<h1 className="text-3xl md:text-4xl font-bold tracking-tight">Pick a workspace</h1>
<p className="text-white/50 text-sm mt-2">
Use Batch for the CSV pipeline. Use Studio for ad-hoc generations.
</p>
</div>
<div className="grid grid-cols-1 md:grid-cols-2 gap-5">
{CARDS.map((c) => {
const isLast = lastSection === c.id;
return (
<a
key={c.id}
href={c.href}
onClick={() => {
try { localStorage.setItem(LAST_KEY, c.id); } catch {}
}}
className={`group relative bg-[#0a0a0a] border rounded-xl p-7 flex flex-col transition-all hover:translate-y-[-2px] hover:border-white/15 ${
isLast ? 'border-[#d9ff00]/40 ring-1 ring-[#d9ff00]/20' : 'border-white/[0.05]'
}`}
>
{isLast && (
<span className="absolute top-3 right-3 text-[10px] uppercase tracking-wide font-bold text-[#d9ff00]">
Last used
</span>
)}
<div className="flex items-start gap-4 mb-4">
<div className={`w-12 h-12 rounded-lg flex items-center justify-center ${
c.id === 'batch' ? 'bg-[#d9ff00]/10 text-[#d9ff00]' : 'bg-white/5 text-white/80'
}`}>
{c.icon}
</div>
<div>
<p className="text-[10px] uppercase tracking-wide text-white/40 font-semibold">{c.badge}</p>
<h2 className="text-2xl font-bold mt-0.5">{c.title}</h2>
<p className="text-white/60 text-sm">{c.tagline}</p>
</div>
</div>
<p className="text-white/50 text-[13px] leading-relaxed mb-5">{c.description}</p>
<ul className="space-y-1.5 mb-6">
{c.bullets.map((b) => (
<li key={b} className="text-[12px] text-white/60 flex items-start gap-2">
<span className="text-[#d9ff00] mt-0.5">·</span>
<span>{b}</span>
</li>
))}
</ul>
<div className="mt-auto">
<span className={`inline-flex items-center gap-2 px-4 py-2 rounded-md text-xs font-semibold transition-all ${c.accent}`}>
Open {c.title}
</span>
</div>
</a>
);
})}
</div>
<p className="text-center text-[11px] text-white/30 mt-8">
<a href="/batch" className="hover:text-white/60">/batch</a>{' '}·{' '}
<a href="/studio" className="hover:text-white/60">/studio</a>
</p>
</div>
</main>
</div>
);
}

View file

@ -0,0 +1,46 @@
'use client';
// Two-pill switcher between the Studio (image/video/etc) and the Batch
// (CSV-driven automation) sections. Rendered in the header of every
// shell so wherever you are, you can jump to the other side.
const SECTIONS = [
{ id: 'studio', label: 'Studio', href: '/studio' },
{ id: 'batch', label: 'Batch', href: '/batch' },
];
export default function SectionSwitcher({ active }) {
return (
<nav
role="tablist"
aria-label="Section"
className="flex items-center gap-0.5 bg-white/5 border border-white/[0.04] rounded-md p-0.5"
>
{SECTIONS.map((s) => {
const isActive = active === s.id;
return (
<a
key={s.id}
href={s.href}
aria-selected={isActive}
role="tab"
className={`px-3 py-1 rounded text-[11px] font-semibold uppercase tracking-wide transition-colors ${
isActive
? 'bg-[#d9ff00] text-black'
: 'text-white/60 hover:text-white hover:bg-white/5'
}`}
onClick={(e) => {
try {
if (typeof window !== 'undefined') {
localStorage.setItem('lastSection', s.id);
}
} catch {}
}}
>
{s.label}
</a>
);
})}
</nav>
);
}

View file

@ -5,6 +5,7 @@ import { useParams, useRouter } from 'next/navigation';
import { ImageStudio, VideoStudio, LipSyncStudio, CinemaStudio, WorkflowStudio, AgentStudio, getUserBalance } from 'studio';
import axios from 'axios';
import ApiKeyModal from './ApiKeyModal';
import SectionSwitcher from './SectionSwitcher';
const TABS = [
{ id: 'image', label: 'Image Studio' },
@ -240,14 +241,17 @@ export default function StandaloneShell() {
{/* Header */}
{isHeaderVisible && (
<header className="flex-shrink-0 h-14 border-b border-white/[0.03] flex items-center justify-between px-6 bg-black/20 backdrop-blur-md z-40">
{/* Left: Logo */}
<div className="flex items-center gap-2">
<div className="w-8 h-8 bg-white rounded-lg flex items-center justify-center">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="black" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 2L2 7l10 5 10-5-10-5zM2 17l10 5 10-5M2 12l10 5 10-5"/>
</svg>
</div>
<span className="text-sm font-bold tracking-tight hidden sm:block">OpenGenerativeAI</span>
{/* Left: Logo + section switcher */}
<div className="flex items-center gap-3">
<a href="/" className="flex items-center gap-2 hover:opacity-80 transition-opacity" title="Home">
<div className="w-8 h-8 bg-white rounded-lg flex items-center justify-center">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="black" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 2L2 7l10 5 10-5-10-5zM2 17l10 5 10-5M2 12l10 5 10-5"/>
</svg>
</div>
<span className="text-sm font-bold tracking-tight hidden sm:block">OpenGenerativeAI</span>
</a>
<SectionSwitcher active="studio" />
</div>
{/* Center: Navigation */}

View file

@ -60,7 +60,7 @@ export default function AddAssetModal({ apiKey, endpoint, label, onClose, onCrea
type="text"
value={name}
onChange={(e) => setName(e.target.value)}
placeholder={label === 'Trainer' ? 'e.g. Raj' : 'e.g. Ahoum studio'}
placeholder={label === 'Trainer' ? 'e.g. Raj' : 'e.g. Main studio'}
className="w-full bg-white/5 border border-white/[0.03] rounded-md px-4 py-2 text-white placeholder-white/20 focus:outline-none focus:ring-1 focus:ring-[#d9ff00]/30"
required
/>

View file

@ -1,6 +1,7 @@
'use client';
import { useEffect, useState, useCallback, useMemo } from 'react';
import SectionSwitcher from '@/components/SectionSwitcher';
const STATUS_STYLES = {
draft: 'bg-white/5 text-white/60 border-white/10',
@ -115,6 +116,8 @@ export default function BatchDetail({ batchId, apiKey }) {
{/* Top header */}
<header className="flex-shrink-0 h-14 border-b border-white/[0.03] flex items-center justify-between px-6 bg-black/20 backdrop-blur-md">
<div className="flex items-center gap-3">
<SectionSwitcher active="batch" />
<span className="text-white/20">·</span>
<a href="/batch" className="text-white/40 hover:text-white text-[12px]"> Back to batches</a>
<span className="text-white/20">/</span>
<span className="text-sm font-bold tracking-tight">{batch.name}</span>

View file

@ -2,6 +2,7 @@
import { useEffect, useState } from 'react';
import ApiKeyModal from '@/components/ApiKeyModal';
import SectionSwitcher from '@/components/SectionSwitcher';
import TrainersTab from './TrainersTab';
import StudiosTab from './StudiosTab';
import BatchesTab from './BatchesTab';
@ -56,13 +57,15 @@ export default function BatchShell() {
<div className="min-h-screen bg-[#030303] text-white flex flex-col">
<header className="flex-shrink-0 h-14 border-b border-white/[0.03] flex items-center justify-between px-6 bg-black/20 backdrop-blur-md">
<div className="flex items-center gap-3">
<div className="w-8 h-8 bg-white rounded-lg flex items-center justify-center">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="black" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 2L2 7l10 5 10-5-10-5zM2 17l10 5 10-5M2 12l10 5 10-5" />
</svg>
</div>
<span className="text-sm font-bold tracking-tight">Batch</span>
<span className="text-white/30 text-sm">/ Open Generative AI</span>
<a href="/" className="flex items-center gap-3 hover:opacity-80 transition-opacity" title="Home">
<div className="w-8 h-8 bg-white rounded-lg flex items-center justify-center">
<svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="black" strokeWidth="2.5" strokeLinecap="round" strokeLinejoin="round">
<path d="M12 2L2 7l10 5 10-5-10-5zM2 17l10 5 10-5M2 12l10 5 10-5" />
</svg>
</div>
<span className="text-sm font-bold tracking-tight hidden sm:block">OpenGenerativeAI</span>
</a>
<SectionSwitcher active="batch" />
</div>
<nav className="flex items-center gap-6">
@ -83,12 +86,6 @@ export default function BatchShell() {
</nav>
<div className="flex items-center gap-3">
<a
href="/studio"
className="text-[12px] text-white/50 hover:text-white/80 transition-colors"
>
Studio
</a>
<button
onClick={handleKeyChange}
className="text-[11px] text-white/40 hover:text-red-400 transition-colors"

View file

@ -18,7 +18,7 @@ services:
worker:
user: root
command: ["sh", "-c", "echo 'worker placeholder — replaced in slice 4'; tail -f /dev/null"]
command: ["sh", "-c", "npx prisma generate && node worker/index.js"]
volumes:
- ./:/app
- worker_node_modules:/app/node_modules

View file

@ -54,7 +54,11 @@ function normaliseRow(raw, idx) {
practiceName,
characterLabel: character,
studioLabel: studio || null,
prompt: composePrompt({ description, startPosition, cameraAngle }),
// Worker renders the full template (lib/promptTemplate.js) at submit
// time using trainer, studio, and the structured fields below. We just
// store the raw practice description here so the prompt-builder has
// clean inputs to work with.
prompt: description,
rawDescription: description,
startPosition: startPosition || null,
cameraAngle: cameraAngle || null,
@ -63,14 +67,6 @@ function normaliseRow(raw, idx) {
};
}
function composePrompt({ description, startPosition, cameraAngle }) {
const parts = [];
if (description) parts.push(description);
if (startPosition) parts.push(`Start position: ${startPosition}.`);
if (cameraAngle) parts.push(`Camera: ${cameraAngle}.`);
return parts.join(' ');
}
function parseDuration(s) {
if (!s) return 15;
const m = s.match(/(\d+)/);

78
lib/promptTemplate.js Normal file
View file

@ -0,0 +1,78 @@
// Hard-coded prompt template for the seedance-v2.0-i2v batch generations.
// Duration, aspect_ratio, and quality are intentionally NOT in the prompt —
// they're separate fields in the MuAPI request body.
//
// Inputs:
// trainer — { name, csvLabel, ... } from the trainers table
// studio — { name, csvLabel, ... } from the studios table (may be null)
// job — { practiceName, prompt (raw description),
// startPosition, cameraAngle }
//
// Returns a single multi-line string ready to drop into payload.prompt.
export function renderPrompt({ trainer, studio, job }) {
const trainerName = (trainer?.name || trainer?.csvLabel || 'the trainer').trim();
const studioName = (studio?.name || studio?.csvLabel || 'the studio').trim();
const practice = job?.practiceName?.trim() || 'the practice';
const camera = job?.cameraAngle?.trim() || 'eye-level tripod shot';
const startPosition = job?.startPosition?.trim() || 'a relaxed neutral standing position';
const description = job?.prompt?.trim() || '';
return [
`Single continuous take of @${trainerName} performing ${practice} inside @${studioName}.`,
`Camera: ${camera}, static tripod shot, no camera movement.`,
`@${trainerName} starts in ${startPosition}.`,
``,
`Step-by-step movement:`,
description,
``,
`Movement style:`,
`- Controlled and instructional`,
`- clearly demonstrates correct form`,
`- no abrupt or unnatural motion`,
``,
`Biomechanics constraints:`,
`- correct joint alignment at all times`,
`- natural range of motion (no overextension)`,
`- stable base and grounded contact with floor`,
`- balanced weight distribution`,
`- spine remains neutral unless specified`,
`Movement must follow real human biomechanics, no artificial motion.`,
``,
`Identity constraints:`,
`- @${trainerName} must remain 100% consistent (face, body, proportions)`,
`- no identity drift`,
``,
`Environment constraints:`,
`- keep studio layout exactly unchanged`,
`- maintain plant positions, lighting, textures`,
`- yoga mat remains centered and stable`,
``,
`Lighting:`,
`- soft natural daylight (from studio windows)`,
`- stable shadows`,
`- no flicker or exposure shifts`,
``,
`Clothing:`,
`- off-white / beige relaxed yoga wear`,
`- natural fabric, no logos`,
``,
`Expression:`,
`- calm, focused, slightly warm and approachable`,
``,
`Video style:`,
`- real-world recording`,
`- no stylization`,
`- no cinematic effects`,
`- no artificial smoothing`,
``,
`Constraints:`,
`- no camera movement`,
`- no cuts or transitions`,
`- no limb warping`,
`- no pose distortion`,
`- no speed ramping`,
`- no background changes`,
`- no added props`,
].join('\n');
}

View file

@ -1,7 +1,7 @@
{
"name": "ai-agent",
"version": "0.0.0-stub",
"description": "Local stub. Upstream submodule (jaiprasad04/ai-agent) is unavailable. Provides no-op exports so the build succeeds; the /agents routes will render a 'feature unavailable' notice at runtime.",
"description": "Local stub. Upstream submodule is unavailable. Provides no-op exports so the build succeeds; the /agents routes render a 'feature unavailable' notice at runtime.",
"main": "src/index.js",
"module": "src/index.js",
"exports": {

View file

@ -21,7 +21,7 @@ const Unavailable = ({ feature }) =>
React.createElement(
'p',
{ style: { color: '#ffffff80', maxWidth: 480 } },
`The ai-agent package is not bundled with this Ahoum-Dev fork. The /agents/${feature} route is a no-op. See packages/ai-agent/package.json for context.`
`The ai-agent package is not bundled in this fork. The /agents/${feature} route is a no-op. See packages/ai-agent/package.json for context.`
)
);

View file

@ -1,7 +1,7 @@
{
"name": "workflow-builder",
"version": "0.0.0-stub",
"description": "Local stub. Upstream submodule (Anil-matcha/workflow-ui) is unavailable. Provides no-op exports so the build succeeds; the Workflows tab will render a 'feature unavailable' notice at runtime.",
"description": "Local stub. Upstream submodule is unavailable. Provides no-op exports so the build succeeds; the Workflows tab renders a 'feature unavailable' notice at runtime.",
"main": "src/index.js",
"module": "src/index.js",
"exports": {

View file

@ -21,6 +21,6 @@ export const WorkflowBuilder = () =>
React.createElement(
'p',
{ style: { color: '#ffffff80', maxWidth: 480 } },
'The workflow-builder package is not bundled with this Ahoum-Dev fork. The Workflows tab is a no-op. See packages/workflow-ui/package.json for context.'
'The workflow-builder package is not bundled in this fork. The Workflows tab is a no-op. See packages/workflow-ui/package.json for context.'
)
);

324
worker/index.js Normal file
View file

@ -0,0 +1,324 @@
// Long-lived worker process. Polls Postgres for queued jobs, submits
// them to MuAPI's seedance-v2.0-i2v endpoint, polls for results, applies
// retry/backoff on failure, and updates batch counters.
//
// Run inside the `worker` docker-compose service. One process per
// deployment is enough for typical batch volumes; the claim query
// uses FOR UPDATE SKIP LOCKED so scaling to N workers is a one-line
// change later.
import { PrismaClient } from '@prisma/client';
import { readFile, writeFile, mkdir } from 'node:fs/promises';
import path from 'node:path';
import { renderPrompt } from '../lib/promptTemplate.js';
const prisma = new PrismaClient({ log: ['error'] });
const MUAPI_BASE = process.env.MUAPI_BASE_URL || 'https://api.muapi.ai';
const API_KEY = process.env.MUAPI_API_KEY || '';
const TICK_MS = parseInt(process.env.WORKER_TICK_MS || '2000', 10);
const UPLOAD_DIR = process.env.UPLOAD_DIR || '/data/uploads';
const TERMINAL_OK = new Set(['completed', 'succeeded', 'success']);
const TERMINAL_FAIL = new Set(['failed', 'error']);
let stopping = false;
process.on('SIGINT', () => { stopping = true; });
process.on('SIGTERM', () => { stopping = true; });
log('starting', {
base: MUAPI_BASE,
hasApiKey: !!API_KEY,
tickMs: TICK_MS,
uploadDir: UPLOAD_DIR,
});
await recoverOrphans();
while (!stopping) {
try {
await tick();
} catch (err) {
log('tick.error', { err: err.message });
}
await sleep(TICK_MS);
}
log('stopping');
await prisma.$disconnect();
// ────────────────────────────────────────────────────────────────────────────
async function tick() {
if (!API_KEY) return; // can't do anything useful without a key
const batches = await prisma.batch.findMany({
where: { status: 'running' },
orderBy: { createdAt: 'asc' },
});
for (const batch of batches) {
await advanceBatch(batch);
}
// Polling jobs may belong to running OR paused batches (we let in-flight
// calls finish even on pause to avoid wasting credits).
await pollPending();
}
async function advanceBatch(batch) {
// Count in-flight slots
const inflight = await prisma.job.count({
where: { batchId: batch.id, status: { in: ['submitting', 'polling'] } },
});
const slots = batch.concurrency - inflight;
if (slots <= 0) return;
// Claim queued jobs atomically with SKIP LOCKED
const ids = await claimJobs(batch.id, slots);
if (ids.length === 0) {
await maybeMarkCompleted(batch);
return;
}
await Promise.all(ids.map(submitJob));
}
async function claimJobs(batchId, slots) {
return prisma.$transaction(async (tx) => {
const rows = await tx.$queryRaw`
SELECT id FROM jobs
WHERE batch_id = ${batchId}
AND status = 'queued'
AND (next_attempt_at IS NULL OR next_attempt_at <= NOW())
ORDER BY row_index ASC
LIMIT ${slots}
FOR UPDATE SKIP LOCKED
`;
const ids = rows.map((r) => r.id);
if (ids.length === 0) return [];
await tx.job.updateMany({
where: { id: { in: ids } },
data: { status: 'submitting', startedAt: new Date(), error: null },
});
return ids;
});
}
async function submitJob(jobId) {
const job = await prisma.job.findUnique({
where: { id: jobId },
include: {
trainer: true,
studio: true,
batch: { select: { model: true } },
},
});
if (!job) return;
try {
if (!job.trainer) throw new Error('Job has no trainer');
const trainerCdnUrl = await ensureMuapiUrl('trainer', job.trainer);
const fullPrompt = renderPrompt({
trainer: job.trainer,
studio: job.studio,
job,
});
const payload = {
prompt: fullPrompt,
images_list: [trainerCdnUrl],
aspect_ratio: job.aspectRatio,
duration: job.duration,
quality: job.quality,
};
const submitRes = await fetch(`${MUAPI_BASE}/api/v1/${job.batch.model}`, {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'x-api-key': API_KEY },
body: JSON.stringify(payload),
});
if (!submitRes.ok) {
const text = await submitRes.text().catch(() => '');
throw new Error(`MuAPI ${submitRes.status}: ${text.slice(0, 200)}`);
}
const data = await submitRes.json();
const requestId = data.request_id || data.id;
if (!requestId) throw new Error(`No request_id in response: ${JSON.stringify(data).slice(0, 200)}`);
await prisma.job.update({
where: { id: job.id },
data: { status: 'polling', muapiRequestId: requestId },
});
log('submit.ok', { jobId: job.id, requestId });
} catch (err) {
log('submit.fail', { jobId: job.id, err: err.message });
await markFailureWithBackoff(job.id, err.message);
}
}
async function pollPending() {
const polling = await prisma.job.findMany({
where: { status: 'polling', muapiRequestId: { not: null } },
take: 50,
});
await Promise.all(polling.map(pollJob));
}
async function pollJob(job) {
try {
const res = await fetch(`${MUAPI_BASE}/api/v1/predictions/${job.muapiRequestId}/result`, {
headers: { 'x-api-key': API_KEY },
});
if (!res.ok) {
// 5xx — transient, leave for next tick. 4xx — fail with backoff.
if (res.status >= 500) return;
const text = await res.text().catch(() => '');
await markFailureWithBackoff(job.id, `Poll ${res.status}: ${text.slice(0, 200)}`);
return;
}
const data = await res.json();
const status = data.status?.toLowerCase();
if (TERMINAL_OK.has(status)) {
const videoUrl = data.outputs?.[0] || data.url || data.output?.url || data.video_url || null;
await prisma.$transaction([
prisma.job.update({
where: { id: job.id },
data: { status: 'done', videoUrl, completedAt: new Date() },
}),
prisma.batch.update({
where: { id: job.batchId },
data: { done: { increment: 1 } },
}),
]);
log('poll.done', { jobId: job.id, videoUrl });
return;
}
if (TERMINAL_FAIL.has(status)) {
await markFailureWithBackoff(job.id, data.error || `MuAPI status: ${status}`);
return;
}
// pending / in_progress — leave it
} catch (err) {
log('poll.error', { jobId: job.id, err: err.message });
}
}
async function markFailureWithBackoff(jobId, errorMsg) {
const job = await prisma.job.findUnique({ where: { id: jobId } });
if (!job) return;
const nextRetries = job.retries + 1;
if (nextRetries > 3) {
await prisma.$transaction([
prisma.job.update({
where: { id: jobId },
data: {
status: 'failed',
retries: nextRetries,
error: errorMsg.slice(0, 1000),
completedAt: new Date(),
},
}),
prisma.batch.update({
where: { id: job.batchId },
data: { failed: { increment: 1 } },
}),
]);
log('job.failed', { jobId, retries: nextRetries });
return;
}
const backoffMs = Math.min(10 * Math.pow(3, job.retries), 300) * 1000;
await prisma.job.update({
where: { id: jobId },
data: {
status: 'queued',
retries: nextRetries,
error: errorMsg.slice(0, 1000),
nextAttemptAt: new Date(Date.now() + backoffMs),
muapiRequestId: null,
},
});
log('job.retry', { jobId, retries: nextRetries, backoffMs });
}
async function maybeMarkCompleted(batch) {
const counts = await prisma.job.groupBy({
by: ['status'],
where: { batchId: batch.id },
_count: { _all: true },
});
const map = Object.fromEntries(counts.map((c) => [c.status, c._count._all]));
const finished = (map.done || 0) + (map.failed || 0) + (map.cancelled || 0);
if (finished >= batch.total) {
await prisma.batch.update({
where: { id: batch.id },
data: { status: 'completed' },
});
log('batch.completed', { batchId: batch.id });
}
}
// If the imageUrl is local (/api/uploads/...), upload the file to MuAPI
// and persist the resulting CDN URL so we only pay the round-trip once.
async function ensureMuapiUrl(kind, asset) {
const url = asset.imageUrl || '';
if (url.startsWith('http://') || url.startsWith('https://')) return url;
// Local URL — re-upload from disk to MuAPI.
const fileName = url.split('/').pop();
if (!fileName) throw new Error(`Invalid asset url: ${url}`);
const folder = kind === 'trainer' ? 'trainers' : 'studios';
const filePath = path.join(UPLOAD_DIR, folder, decodeURIComponent(fileName));
const buf = await readFile(filePath);
const form = new FormData();
form.append('file', new Blob([buf]), fileName);
const res = await fetch(`${MUAPI_BASE}/api/v1/upload_file`, {
method: 'POST',
headers: { 'x-api-key': API_KEY },
body: form,
});
if (!res.ok) {
const text = await res.text().catch(() => '');
throw new Error(`MuAPI upload_file ${res.status}: ${text.slice(0, 200)}`);
}
const data = await res.json();
const cdnUrl = data.url || data.file_url || data?.data?.url;
if (!cdnUrl) throw new Error('MuAPI upload returned no URL');
// Persist back to DB so we don't re-upload for future jobs.
if (kind === 'trainer') {
await prisma.trainer.update({ where: { id: asset.id }, data: { imageUrl: cdnUrl } });
} else {
await prisma.studio.update({ where: { id: asset.id }, data: { imageUrl: cdnUrl } });
}
log('reupload.ok', { kind, assetId: asset.id });
return cdnUrl;
}
async function recoverOrphans() {
// Re-queue anything left in 'submitting' from a crashed previous run.
const submitting = await prisma.job.updateMany({
where: { status: 'submitting' },
data: { status: 'queued', muapiRequestId: null },
});
if (submitting.count > 0) log('recover.submitting', { count: submitting.count });
// 'polling' jobs can keep going — we have their muapiRequestId.
const stillPolling = await prisma.job.count({ where: { status: 'polling' } });
if (stillPolling > 0) log('recover.polling', { count: stillPolling });
}
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
function log(event, fields = {}) {
const ts = new Date().toISOString();
process.stdout.write(`[${ts}] ${event} ${JSON.stringify(fields)}\n`);
}

3
worker/package.json Normal file
View file

@ -0,0 +1,3 @@
{
"type": "module"
}