Skip to content

Workers

Background tasks run on separate worker processes using TaskIQ with RabbitMQ as the message broker. This keeps the API responsive while heavy operations like transcription and video rendering happen asynchronously. Redis remains for SSE pub/sub, cache, sessions, and task result storage.

How It Works

sequenceDiagram
    participant API
    participant RMQ as RabbitMQ
    participant Worker
    participant DB

    API->>RMQ: .kicker().with_labels(priority=N).kiq()
    API->>API: Return immediately
    Worker->>RMQ: Consume (AMQP)
    RMQ->>Worker: Deliver task (highest priority first)
    Worker->>Worker: Execute task
    Worker->>DB: Update records
    Worker->>RMQ: Ack message

You define tasks as async functions decorated with @broker.task. When you call .kicker().with_labels(priority=N).kiq() on a task, it serializes the arguments and publishes a message to the RabbitMQ queue with the specified priority. Workers consume messages in priority order.

The API and workers are separate processes. You can scale workers independently by running more of them.

Brokers

We have specialized brokers for different workloads. Each uses a dedicated RabbitMQ queue:

flowchart TB
    subgraph API["FastAPI"]
        A[Queue Tasks]
    end

    subgraph RMQ["RabbitMQ Queues"]
        Q1[(download)]
        Q2[(analysis<br/>priority 0-3)]
        Q3[(render<br/>priority 0-3)]
        Q4[(asset<br/>priority 0-3)]
        Q5[(email)]
        Q6[(proxy)]
    end

    subgraph Workers["Worker Processes"]
        W1[Download Workers]
        W2[Analysis Workers]
        W3[Render Workers]
        W4[Asset Workers]
        W5[Email Workers]
        W6[Proxy Workers]
    end

    A --> Q1
    A --> Q2
    A --> Q3
    A --> Q4
    A --> Q5
    A --> Q6

    Q1 --> W1
    Q2 --> W2
    Q3 --> W3
    Q4 --> W4
    Q5 --> W5
    Q6 --> W6
Broker Queue Priority Purpose
analysis_broker analysis 0-3 Whisper transcription, silence/false-start/profanity detection
render_broker render 0-3 FFmpeg video processing, audio mute/bleep
asset_broker asset 0-3 Asset editing (trim, extract audio)
download_broker download FIFO yt-dlp (clips and assets), audio extraction, waveform, thumbnail
proxy_broker proxy FIFO FFmpeg HEVC/4K → H.264 480p re-encode for non-web-compatible clips (CPU-heavy, split from download so audio extraction isn't blocked)
email_broker email FIFO Postmark email sending, scheduled cron tasks

The brokers are defined in backend/src/infrastructure/taskiq/brokers.py.

Priority Queue

User-facing brokers (analysis, render, asset) use RabbitMQ native priority queues (max_priority=3, QueueType.CLASSIC). Higher-tier users get their tasks consumed first.

Tier is_paid Priority RabbitMQ Behavior
viral true 3 (HIGH) Consumed first
creator true 2 (NORMAL) Consumed second
hobby true 1 (LOW) Consumed third
trial / free false 0 (BACKGROUND) Consumed last

Priority is resolved at enqueue time via resolve_task_priority(tier_name, is_paid) in infrastructure/taskiq/priority.py. The tier context comes from resolve_tier_context(user_id, db).

Analysis and render workers use --ack-type when_executed so messages are only acknowledged after task completion. If a worker crashes mid-task, RabbitMQ requeues the message automatically.

Prefetch count (qos=2) is kept low to ensure strict priority ordering — workers won't buffer many low-priority messages ahead of high-priority ones arriving later.

Task Lifecycle

stateDiagram-v2
    [*] --> Queued: .kiq()
    Queued --> Running: Worker picks up
    Running --> Success: Task completes
    Running --> Failed: Error thrown
    Failed --> Queued: Retry (if enabled)
    Failed --> Dead: Max retries exceeded
    Success --> [*]
    Dead --> [*]

Defining Tasks

Tasks live in their module's tasks.py file. Here's the pattern:

# backend/src/workers/download/tasks.py

from src.infrastructure.taskiq import download_broker
from src.infrastructure.taskiq.deps import get_db_session, get_storage_client
from taskiq import TaskiqDepends

@download_broker.task(
    task_name="download_youtube_video",
    retry_on_error=True,
    max_retries=3,
)
async def download_youtube_video(
    clip_file_uuid: str,
    db: AsyncSession = TaskiqDepends(get_db_session),
    storage: StorageClient = TaskiqDepends(get_storage_client),
) -> DownloadResult:
    """Download video from YouTube and process it."""
    # ... implementation

Key points:

  1. Task names must be unique across all brokers
  2. All arguments must be JSON-serializable (except TaskiqDepends injections)
  3. Use TaskiqDepends for dependencies like database sessions - they're created fresh for each task
  4. Return values should be serializable for result storage

Queuing Tasks

For priority-routed tasks (analysis, render, asset), resolve the user's tier and enqueue with priority:

tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
    project_uuid=str(project_uuid),
    pacing_level=50,
)

For non-priority tasks (download, email), use plain .kiq():

await download_youtube_video.kiq(clip_file_uuid=str(clip.uuid))

Retry Policy

flowchart LR
    A[Task Fails] --> B{Retries left?}
    B -->|Yes| C[Wait with backoff]
    C --> D[Retry task]
    D --> E{Success?}
    E -->|No| B
    E -->|Yes| F[Done]
    B -->|No| G[Mark as dead]

The download and asset brokers use SmartRetryMiddleware:

  • Max retries: 3
  • Initial delay: 5 seconds
  • Exponential backoff with jitter
  • Max delay: 60 seconds

This handles transient failures like network issues. Permanent failures (like invalid YouTube URLs) fail fast.

Running Workers

Start specific brokers for scaling:

# Scale download workers
uv run taskiq worker src.infrastructure.taskiq:download_broker --workers 4

# Scale analysis workers (CPU-heavy)
uv run taskiq worker src.infrastructure.taskiq:analysis_broker --workers 2

Scaling Strategy

flowchart TB
    subgraph Download["Download Workers (I/O bound)"]
        D1[Worker 1]
        D2[Worker 2]
        D3[Worker 3]
        D4[Worker 4]
    end

    subgraph Analysis["Analysis Workers (CPU bound)"]
        A1[Worker 1]
        A2[Worker 2]
    end

    subgraph Render["Render Workers (CPU bound)"]
        R1[Worker 1]
        R2[Worker 2]
    end

In Docker Compose, the worker service runs all brokers. For production, you'd typically split them into separate deployments.

Database Access

Tasks run in separate processes, so they need their own database connections. The get_db_session dependency creates a session per task:

@download_broker.task(task_name="my_task")
async def my_task(
    item_id: int,
    db: AsyncSession = TaskiqDepends(get_db_session),
):
    item = await crud_items.get(db=db, id=item_id)
    # ...

The connection pool uses NullPool to avoid issues with connections persisting across tasks.

Key Files

Component Location
Broker setup backend/src/infrastructure/taskiq/brokers.py
DB dependency backend/src/infrastructure/taskiq/deps.py
Worker entry backend/src/infrastructure/taskiq/worker.py
Download tasks (yt-dlp, audio extraction, waveform, thumbnail) backend/src/workers/download/tasks.py
Proxy task (generate_clip_proxy, registered on proxy_broker) backend/src/workers/download/tasks.py
Analysis tasks backend/src/workers/analysis/tasks.py
Render tasks backend/src/workers/render/tasks.py
Asset edit tasks backend/src/workers/asset_edit/tasks.py

← Events Timeline →