Workers¶
Background tasks run on separate worker processes using TaskIQ with RabbitMQ as the message broker. This keeps the API responsive while heavy operations like transcription and video rendering happen asynchronously. Redis remains for SSE pub/sub, cache, sessions, and task result storage.
How It Works¶
sequenceDiagram
participant API
participant RMQ as RabbitMQ
participant Worker
participant DB
API->>RMQ: .kicker().with_labels(priority=N).kiq()
API->>API: Return immediately
Worker->>RMQ: Consume (AMQP)
RMQ->>Worker: Deliver task (highest priority first)
Worker->>Worker: Execute task
Worker->>DB: Update records
Worker->>RMQ: Ack message
You define tasks as async functions decorated with @broker.task. When you call .kicker().with_labels(priority=N).kiq() on a task, it serializes the arguments and publishes a message to the RabbitMQ queue with the specified priority. Workers consume messages in priority order.
The API and workers are separate processes. You can scale workers independently by running more of them.
Brokers¶
We have specialized brokers for different workloads. Each uses a dedicated RabbitMQ queue:
flowchart TB
subgraph API["FastAPI"]
A[Queue Tasks]
end
subgraph RMQ["RabbitMQ Queues"]
Q1[(download)]
Q2[(analysis<br/>priority 0-3)]
Q3[(render<br/>priority 0-3)]
Q4[(asset<br/>priority 0-3)]
Q5[(email)]
Q6[(proxy)]
end
subgraph Workers["Worker Processes"]
W1[Download Workers]
W2[Analysis Workers]
W3[Render Workers]
W4[Asset Workers]
W5[Email Workers]
W6[Proxy Workers]
end
A --> Q1
A --> Q2
A --> Q3
A --> Q4
A --> Q5
A --> Q6
Q1 --> W1
Q2 --> W2
Q3 --> W3
Q4 --> W4
Q5 --> W5
Q6 --> W6
| Broker | Queue | Priority | Purpose |
|---|---|---|---|
analysis_broker |
analysis | 0-3 | Whisper transcription, silence/false-start/profanity detection |
render_broker |
render | 0-3 | FFmpeg video processing, audio mute/bleep |
asset_broker |
asset | 0-3 | Asset editing (trim, extract audio) |
download_broker |
download | FIFO | yt-dlp (clips and assets), audio extraction, waveform, thumbnail |
proxy_broker |
proxy | FIFO | FFmpeg HEVC/4K → H.264 480p re-encode for non-web-compatible clips (CPU-heavy, split from download so audio extraction isn't blocked) |
email_broker |
FIFO | Postmark email sending, scheduled cron tasks |
The brokers are defined in backend/src/infrastructure/taskiq/brokers.py.
Priority Queue¶
User-facing brokers (analysis, render, asset) use RabbitMQ native priority queues (max_priority=3, QueueType.CLASSIC). Higher-tier users get their tasks consumed first.
| Tier | is_paid |
Priority | RabbitMQ Behavior |
|---|---|---|---|
| viral | true | 3 (HIGH) | Consumed first |
| creator | true | 2 (NORMAL) | Consumed second |
| hobby | true | 1 (LOW) | Consumed third |
| trial / free | false | 0 (BACKGROUND) | Consumed last |
Priority is resolved at enqueue time via resolve_task_priority(tier_name, is_paid) in infrastructure/taskiq/priority.py. The tier context comes from resolve_tier_context(user_id, db).
Analysis and render workers use --ack-type when_executed so messages are only acknowledged after task completion. If a worker crashes mid-task, RabbitMQ requeues the message automatically.
Prefetch count (qos=2) is kept low to ensure strict priority ordering — workers won't buffer many low-priority messages ahead of high-priority ones arriving later.
Task Lifecycle¶
stateDiagram-v2
[*] --> Queued: .kiq()
Queued --> Running: Worker picks up
Running --> Success: Task completes
Running --> Failed: Error thrown
Failed --> Queued: Retry (if enabled)
Failed --> Dead: Max retries exceeded
Success --> [*]
Dead --> [*]
Defining Tasks¶
Tasks live in their module's tasks.py file. Here's the pattern:
# backend/src/workers/download/tasks.py
from src.infrastructure.taskiq import download_broker
from src.infrastructure.taskiq.deps import get_db_session, get_storage_client
from taskiq import TaskiqDepends
@download_broker.task(
task_name="download_youtube_video",
retry_on_error=True,
max_retries=3,
)
async def download_youtube_video(
clip_file_uuid: str,
db: AsyncSession = TaskiqDepends(get_db_session),
storage: StorageClient = TaskiqDepends(get_storage_client),
) -> DownloadResult:
"""Download video from YouTube and process it."""
# ... implementation
Key points:
- Task names must be unique across all brokers
- All arguments must be JSON-serializable (except
TaskiqDependsinjections) - Use
TaskiqDependsfor dependencies like database sessions - they're created fresh for each task - Return values should be serializable for result storage
Queuing Tasks¶
For priority-routed tasks (analysis, render, asset), resolve the user's tier and enqueue with priority:
tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
project_uuid=str(project_uuid),
pacing_level=50,
)
For non-priority tasks (download, email), use plain .kiq():
Retry Policy¶
flowchart LR
A[Task Fails] --> B{Retries left?}
B -->|Yes| C[Wait with backoff]
C --> D[Retry task]
D --> E{Success?}
E -->|No| B
E -->|Yes| F[Done]
B -->|No| G[Mark as dead]
The download and asset brokers use SmartRetryMiddleware:
- Max retries: 3
- Initial delay: 5 seconds
- Exponential backoff with jitter
- Max delay: 60 seconds
This handles transient failures like network issues. Permanent failures (like invalid YouTube URLs) fail fast.
Running Workers¶
Start specific brokers for scaling:
# Scale download workers
uv run taskiq worker src.infrastructure.taskiq:download_broker --workers 4
# Scale analysis workers (CPU-heavy)
uv run taskiq worker src.infrastructure.taskiq:analysis_broker --workers 2
Scaling Strategy¶
flowchart TB
subgraph Download["Download Workers (I/O bound)"]
D1[Worker 1]
D2[Worker 2]
D3[Worker 3]
D4[Worker 4]
end
subgraph Analysis["Analysis Workers (CPU bound)"]
A1[Worker 1]
A2[Worker 2]
end
subgraph Render["Render Workers (CPU bound)"]
R1[Worker 1]
R2[Worker 2]
end
In Docker Compose, the worker service runs all brokers. For production, you'd typically split them into separate deployments.
Database Access¶
Tasks run in separate processes, so they need their own database connections. The get_db_session dependency creates a session per task:
@download_broker.task(task_name="my_task")
async def my_task(
item_id: int,
db: AsyncSession = TaskiqDepends(get_db_session),
):
item = await crud_items.get(db=db, id=item_id)
# ...
The connection pool uses NullPool to avoid issues with connections persisting across tasks.
Key Files¶
| Component | Location |
|---|---|
| Broker setup | backend/src/infrastructure/taskiq/brokers.py |
| DB dependency | backend/src/infrastructure/taskiq/deps.py |
| Worker entry | backend/src/infrastructure/taskiq/worker.py |
| Download tasks (yt-dlp, audio extraction, waveform, thumbnail) | backend/src/workers/download/tasks.py |
Proxy task (generate_clip_proxy, registered on proxy_broker) |
backend/src/workers/download/tasks.py |
| Analysis tasks | backend/src/workers/analysis/tasks.py |
| Render tasks | backend/src/workers/render/tasks.py |
| Asset edit tasks | backend/src/workers/asset_edit/tasks.py |