Skip to content

AI & Workers Development Guide

This guide covers patterns for developing background workers and AI-powered analysis in Sapari. Workers handle anything too slow for a request/response cycle - video transcription, LLM analysis, FFmpeg rendering.

Worker Structure

Each worker follows a consistent layout:

flowchart TB
    subgraph workers["workers/"]
        A[analysis/] --> AT[tasks.py]
        A --> AP[pipeline.py]
        A --> AD[deps.py]
        A --> AS[steps/]

        R[render/] --> RT[tasks.py]
        R --> RP[pipeline.py]

        D[download/] --> DT[tasks.py]
    end

Each worker module follows this structure:

workers/{name}/
├── tasks.py          # TaskIQ task definitions
├── pipeline.py       # FastroAI pipeline definition
├── deps.py           # Dependency injection classes
├── schemas.py        # Input/output schemas
├── constants.py      # Configuration constants
└── {subdomain}/      # Step implementations
    ├── step.py
    └── logic.py

Pipeline Pattern

Pipelines are DAGs that orchestrate multi-step workflows. Steps with no dependencies between them run in parallel automatically:

flowchart LR
    A[load_audio] --> B[transcribe]
    B --> C[detect_silences]
    B --> D[detect_false_starts]
    B --> IT[improve_transcript]
    B --> IA[insert_fixed_assets]
    B --> AD[insert_ai_directed_assets]
    IT --> P[censor_profanity]
    C --> E[validate_edits]
    D --> E
    E --> F[create_edits]
    P --> F
    IA --> F
    AD --> F
    F --> G[update_project]
    P --> G

Defining a Pipeline

# workers/analysis/pipeline.py
from typing import Any
from fastroai import Pipeline, PipelineConfig, StepConfig

from .deps import AnalysisDeps
from .audio import LoadAudioStep
from .transcription import TranscribeStep
from .silence import DetectSilencesStep
from .false_starts import DetectFalseStartsStep, ValidateEditsStep
from .transcript_improvement import ImproveTranscriptStep
from .assets import InsertFixedAssetsStep
from .ai_director import InsertAIDirectedAssetsStep
from .profanity import CensorProfanityStep
from .edits import CreateEditsStep, UpdateProjectStep

analysis_pipeline: Pipeline[AnalysisDeps, dict[str, Any], None] = Pipeline(
    name="video_analysis",

    # Step instances
    steps={
        "load_audio": LoadAudioStep(),
        "transcribe": TranscribeStep(),
        "detect_silences": DetectSilencesStep(),
        "detect_false_starts": DetectFalseStartsStep(),
        "improve_transcript": ImproveTranscriptStep(),
        "insert_fixed_assets": InsertFixedAssetsStep(),
        "insert_ai_directed_assets": InsertAIDirectedAssetsStep(),
        "censor_profanity": CensorProfanityStep(),
        "validate_edits": ValidateEditsStep(),
        "create_edits": CreateEditsStep(),
        "update_project": UpdateProjectStep(),
    },

    # Dependency graph - steps run in parallel when possible
    dependencies={
        "transcribe": ["load_audio"],
        "detect_silences": ["transcribe"],
        "detect_false_starts": ["transcribe", "load_audio"],
        "improve_transcript": ["transcribe"],
        "insert_fixed_assets": ["transcribe"],
        "insert_ai_directed_assets": ["transcribe"],
        "censor_profanity": ["improve_transcript"],
        "validate_edits": ["detect_silences", "detect_false_starts"],
        "create_edits": ["validate_edits", "censor_profanity", "insert_fixed_assets", "insert_ai_directed_assets"],
        "update_project": ["create_edits", "censor_profanity"],
    },

    # Which step's output is the final result
    output_step="update_project",

    # Global timeout
    config=PipelineConfig(timeout=600.0),

    # Per-step timeouts
    step_configs={
        "load_audio": StepConfig(timeout=120.0),
        "transcribe": StepConfig(timeout=300.0),
        "detect_false_starts": StepConfig(timeout=600.0),
        "insert_ai_directed_assets": StepConfig(timeout=180.0),
    },
)

Pipeline Dependencies

Dependencies are shared resources injected into all steps:

# workers/analysis/deps.py
from dataclasses import dataclass
from openai import AsyncOpenAI
from sqlalchemy.ext.asyncio import AsyncSession
from ...infrastructure.storage import StorageClient

@dataclass
class AnalysisDeps:
    """Dependencies shared across all analysis steps."""
    db: AsyncSession
    storage: StorageClient
    openai_client: AsyncOpenAI

Step Pattern

Steps are the atomic units of work. Each step receives outputs from its dependencies via ctx.get_dependency() and returns a typed result:

flowchart TB
    subgraph Step["BaseStep"]
        I[__init__] --> |Initialize resources| E[execute]
        E --> |Access deps| D[ctx.deps]
        E --> |Get input| IN[ctx.get_input]
        E --> |Get previous output| P[ctx.get_dependency]
        E --> |Return result| R[Output Type]
    end

Creating a Step

# workers/analysis/transcription/step.py
from pathlib import Path
from uuid import UUID

from fastroai import BaseStep, StepContext

from ....infrastructure.events import AnalysisProgressEvent, publish_event
from ..deps import AnalysisDeps
from ..schemas import AnalysisInput, TranscriptionResult
from .whisper import transcribe


class TranscribeStep(BaseStep[AnalysisDeps, TranscriptionResult]):
    """Transcribe audio using OpenAI Whisper API."""

    async def execute(self, ctx: StepContext[AnalysisDeps]) -> TranscriptionResult:
        # Get output from previous step
        audio_path: Path = ctx.get_dependency("load_audio")

        # Get pipeline input data
        input_data: AnalysisInput = ctx.get_input("input")
        project_uuid = UUID(input_data.project_uuid)

        # Publish progress event for UI
        await publish_event(
            AnalysisProgressEvent(
                project_uuid=project_uuid,
                step="transcribe",
                progress=30,
                message="Transcribing speech...",
            )
        )

        # Access shared dependencies
        result = await transcribe(
            client=ctx.deps.openai_client,
            audio_path=audio_path,
            language=input_data.language,
        )

        return result

Step Context Methods

Method Purpose
ctx.deps Access shared dependencies (db, storage, etc.)
ctx.get_input("input") Get pipeline input data
ctx.get_dependency("step_name") Get output from a previous step
ctx.run(agent, message) Run an LLM agent with cost tracking

Task Pattern

Tasks are the entry points. API routes queue tasks, TaskIQ executes them asynchronously, and the task sets up dependencies before running the pipeline:

# workers/analysis/tasks.py
from typing import Annotated
from uuid import UUID

from openai import AsyncOpenAI
from taskiq import TaskiqDepends

from ...infrastructure.storage import StorageClient
from ...infrastructure.taskiq import DBSession, analysis_broker
from ...infrastructure.events import (
    AnalysisStartedEvent,
    AnalysisCompleteEvent,
    AnalysisFailedEvent,
    publish_event,
)
from ...modules.project.crud import crud_projects
from ...modules.project.enums import ProjectStatus
from .deps import AnalysisDeps
from .pipeline import analysis_pipeline
from .schemas import AnalysisInput, AnalysisOutput


# Dependency factories
def get_storage_client() -> StorageClient:
    return StorageClient()

def get_openai_client() -> AsyncOpenAI:
    return AsyncOpenAI()

# Type aliases for injection
StorageClientDep = Annotated[StorageClient, TaskiqDepends(get_storage_client)]
OpenAIClientDep = Annotated[AsyncOpenAI, TaskiqDepends(get_openai_client)]


@analysis_broker.task(task_name="analyze_project")
async def analyze_project(
    project_uuid: str,
    pacing_level: int = 50,
    false_start_sensitivity: int = 50,
    language: str | None = None,
    # Injected dependencies (use None with type: ignore)
    db: DBSession = None,  # type: ignore[assignment]
    storage: StorageClientDep = None,  # type: ignore[assignment]
    openai_client: OpenAIClientDep = None,  # type: ignore[assignment]
) -> AnalysisOutput:
    """Analyze a project: transcribe and detect edits."""
    project_uuid_obj = UUID(project_uuid)
    temp_dir = None

    try:
        # Update status
        await crud_projects.update(
            db,
            object={"status": ProjectStatus.ANALYZING},
            uuid=project_uuid_obj,
        )

        # Publish start event
        await publish_event(
            AnalysisStartedEvent(
                project_uuid=project_uuid_obj,
                pacing_level=pacing_level,
                language=language,
            )
        )

        # Build dependencies and input
        deps = AnalysisDeps(db=db, storage=storage, openai_client=openai_client)
        input_data = AnalysisInput(
            project_uuid=project_uuid,
            pacing_level=pacing_level,
            false_start_sensitivity=false_start_sensitivity,
            language=language,
        )

        # Execute pipeline
        result = await analysis_pipeline.execute(
            input_data={"input": input_data},
            deps=deps,
        )

        # Extract results
        transcript = result.step_outputs.get("transcribe")
        edits_count = result.step_outputs.get("create_edits", 0)

        # Publish completion
        await publish_event(
            AnalysisCompleteEvent(
                project_uuid=project_uuid_obj,
                edit_count=edits_count,
                word_count=len(transcript.words) if transcript else 0,
            )
        )

        return AnalysisOutput(
            project_uuid=project_uuid,
            transcript_text=transcript.text if transcript else "",
            edits_created=edits_count,
        )

    except Exception as e:
        # Publish failure event
        await publish_event(
            AnalysisFailedEvent(
                project_uuid=project_uuid_obj,
                error=str(e)[:500],
            )
        )

        # Update status
        await crud_projects.update(
            db,
            object={
                "status": ProjectStatus.FAILED,
                "error_message": str(e)[:500],
            },
            uuid=project_uuid_obj,
        )
        raise

    finally:
        # Cleanup temp files
        if temp_dir and temp_dir.exists():
            shutil.rmtree(temp_dir)

Triggering Tasks

From API routes:

from ...workers.analysis.tasks import analyze_project

# Queue the task (returns immediately)
await analyze_project.kiq(
    project_uuid=str(project.uuid),
    pacing_level=50,
    false_start_sensitivity=70,
    language="en",
)

LLM Integration

Using FastroAgent with PydanticAI

Agents use pydantic_ai.Agent for structured LLM output, wrapped in FastroAgent for pipeline integration and cost tracking. Initialize once in __init__ (reused across executions), call via ctx.run():

from fastroai import BaseStep, FastroAgent, StepContext
from pydantic_ai import Agent
from pydantic_ai.models.fallback import FallbackModel
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.deepseek import DeepSeekProvider

class DetectFalseStartsStep(BaseStep[AnalysisDeps, list[FalseStartRegion]]):
    """Detect false starts using LLM analysis."""

    def __init__(self) -> None:
        # DeepSeek Reasoner primary, GPT-5-mini fallback
        fallback_model = FallbackModel(
            OpenAIChatModel("deepseek-reasoner", provider=DeepSeekProvider()),
            OpenAIChatModel("gpt-5-mini"),
        )

        pydantic_agent: Agent[None, DetectionResult] = Agent(
            model=fallback_model,
            system_prompt=SYSTEM_PROMPT,
            output_type=DetectionResult,  # Pydantic model
        )

        self._agent: FastroAgent[DetectionResult] = FastroAgent(
            agent=pydantic_agent,
            output_type=DetectionResult,
            temperature=0.3,
            timeout=300,
            max_tokens=16000,
        )

    async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[FalseStartRegion]:
        transcript: TranscriptionResult = ctx.get_dependency("transcribe")
        input_data: AnalysisInput = ctx.get_input("input")

        # Use ctx.run() for cost tracking
        response = await ctx.run(self._agent, user_message)
        result: DetectionResult = response.output

        return process_results(result, transcript.words)

Structured Output

LLM responses are Pydantic models:

from pydantic import BaseModel

class Cut(BaseModel):
    start_word_idx: int
    end_word_idx: int
    removed_text: str
    keeper_preview: str
    confidence: float  # 0.0-1.0
    pattern_type: str
    reason: str

class DetectionResult(BaseModel):
    cuts: list[Cut]
    explanation: str

Sensitivity Thresholds

Map user sensitivity (0-100) to confidence thresholds:

def get_sensitivity_threshold(sensitivity: int) -> float:
    """Higher sensitivity = lower threshold = more detections."""
    thresholds = {
        25: 0.85,   # Conservative - high confidence only
        50: 0.70,   # Balanced
        75: 0.55,   # Aggressive
        100: 0.40,  # Very aggressive
    }
    # Interpolate for values between
    return interpolate(sensitivity, thresholds)

Whisper Integration

Transcription with Word Timing

# workers/analysis/transcription/whisper.py
async def transcribe(
    client: AsyncOpenAI,
    audio_path: Path,
    language: str | None = None,
) -> TranscriptionResult:
    """Transcribe with word-level timestamps."""

    file_size = audio_path.stat().st_size

    # Auto-chunk large files
    if file_size <= WHISPER_MAX_FILE_SIZE:
        return await _transcribe_single(client, audio_path, language)
    else:
        return await _transcribe_chunked(client, audio_path, language)


async def _transcribe_single(client, audio_path, language):
    with open(audio_path, "rb") as audio_file:
        response = await client.audio.transcriptions.create(
            model="whisper-1",
            file=audio_file,
            response_format="verbose_json",
            timestamp_granularities=["word"],
            language=language,
        )

    # Extract word-level timing
    words = [
        TranscriptWord(
            word=w.word,
            start_ms=int(w.start * 1000),
            end_ms=int(w.end * 1000),
        )
        for w in response.words
    ]

    return TranscriptionResult(
        language=response.language,
        duration_ms=int(response.duration * 1000),
        words=words,
        text=response.text,
    )

Event Publishing

SSE Events

Workers publish progress events via Redis pub/sub. The frontend subscribes and updates the UI in real-time:

from ...infrastructure.events import (
    AnalysisProgressEvent,
    AnalysisCompleteEvent,
    AnalysisFailedEvent,
    publish_event,
)

# Progress update
await publish_event(
    AnalysisProgressEvent(
        project_uuid=project_uuid,
        step="transcribe",
        progress=30,  # 0-100
        message="Transcribing speech...",
    )
)

# Completion
await publish_event(
    AnalysisCompleteEvent(
        project_uuid=project_uuid,
        edit_count=42,
        word_count=1500,
        duration_ms=180000,
    )
)

# Failure
await publish_event(
    AnalysisFailedEvent(
        project_uuid=project_uuid,
        error="Transcription failed: invalid audio format",
    )
)

Event Types

Event When Key Fields
AnalysisStartedEvent Pipeline begins pacing_level, language
AnalysisProgressEvent After each step step, progress, message
AnalysisCompleteEvent Success edit_count, word_count
AnalysisFailedEvent Error error

Notification Triggers

After publishing SSE events, workers also create persistent in-app notifications via NotificationService().create(). These are best-effort (wrapped in try/except):

# After AnalysisCompleteEvent
await NotificationService().create(
    notification=NotificationCreate(
        user_id=user_id,
        title="Analysis complete",
        message=f'"{project_name}" -- {edits_count} edits found',
        type=NotificationType.SUCCESS,
    ),
    db=db,
)

Current triggers: analysis complete (analysis worker), export ready (render worker).

Captions Only Mode

When a client posts analysis_mode="captions_only", the analyze endpoint enforces that all ai_edit settings are at their defaults: pacing_level=0, false_start_sensitivity=0, audio_censorship="none", caption_censorship="none", director_notes="". Any mismatch returns 422 before credit reservation via ProjectService.validate_analysis_mode_consistency. Only AI Director asset fetching is mode-gated inside the worker itself (workers/analysis/tasks.py skips the catalog fetch for captions_only).

The combination of the API-layer reject and the default values means: DetectSilencesStep returns [] (pacing=0), DetectFalseStartsStep returns [] (sensitivity=0), CensorProfanityStep returns unchanged (both censorship fields "none"), InsertAIDirectedAssetsStep returns [] (empty catalog). Only Whisper transcription, transcript improvement (if language set), and caption generation run.

Credits are charged at 0.5x rate via CREDIT_MULTIPLIERS in analysis_run/constants.py. A tampered client that bypasses the frontend can't game this — the backend validates mode-vs-settings consistency on every analyze request.

Adding a New Detection Type

Adding a new detection type (e.g., filler words) requires three changes: create the step, wire it into the pipeline, and update validation to include its results.

1. Create the Step

# workers/analysis/filler_words/step.py
class DetectFillerWordsStep(BaseStep[AnalysisDeps, list[FillerWordRegion]]):
    """Detect filler words like 'um', 'uh', 'like'."""

    async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[FillerWordRegion]:
        transcript: TranscriptionResult = ctx.get_dependency("transcribe")
        input_data: AnalysisInput = ctx.get_input("input")

        await publish_event(
            AnalysisProgressEvent(
                project_uuid=UUID(input_data.project_uuid),
                step="detect_filler_words",
                progress=50,
                message="Detecting filler words...",
            )
        )

        # Detection logic
        filler_patterns = ["um", "uh", "like", "you know"]
        regions = []

        for i, word in enumerate(transcript.words):
            if word.word.lower().strip() in filler_patterns:
                regions.append(
                    FillerWordRegion(
                        start_ms=word.start_ms,
                        end_ms=word.end_ms,
                        word=word.word,
                        confidence=0.9,
                    )
                )

        return regions

2. Add to Pipeline

# workers/analysis/pipeline.py
analysis_pipeline = Pipeline(
    steps={
        # ... existing steps
        "detect_filler_words": DetectFillerWordsStep(),  # Add
    },
    dependencies={
        # ... existing deps
        "detect_filler_words": ["transcribe"],  # After transcribe
        "validate_edits": ["detect_silences", "detect_false_starts", "detect_filler_words"],  # Update
    },
)

3. Update Validation

# workers/analysis/false_starts/validation/step.py
class ValidateEditsStep(BaseStep[AnalysisDeps, list[ValidatedEdit]]):
    async def execute(self, ctx: StepContext[AnalysisDeps]) -> list[ValidatedEdit]:
        silences = ctx.get_dependency("detect_silences")
        false_starts = ctx.get_dependency("detect_false_starts")
        filler_words = ctx.get_dependency("detect_filler_words")  # Add

        all_edits = silences + false_starts + filler_words  # Include
        # ... validation logic

Key Conventions

  1. Async everything - All I/O is async
  2. Event-driven - Publish progress events for UI
  3. Typed outputs - Steps return typed Pydantic models
  4. Cost tracking - Use ctx.run() for LLM calls
  5. Cleanup - Always cleanup temp files in finally
  6. Chunking - Handle large files by chunking

Key Files

Purpose Location
Analysis pipeline workers/analysis/pipeline.py
Analysis task workers/analysis/tasks.py
Dependencies workers/analysis/deps.py
Transcription workers/analysis/transcription/
False start detection workers/analysis/false_starts/
Silence detection workers/analysis/silence/
Event schemas infrastructure/events/schemas.py
Event publisher infrastructure/events/publisher.py

← Frontend Development Analysis Pipeline →