Skip to content

Backend Development Guide

This guide covers patterns and conventions for developing in the Sapari backend.

Project Structure

The backend follows a layered architecture: business logic in modules, API routes in interfaces, shared infrastructure, and background workers.

flowchart TB
    subgraph src["backend/src/"]
        M[modules/] --> |Business Logic| MC[user/, clip/, project/, ...]
        I[interfaces/] --> |API Layer| IA[api/v1/]
        IN[infrastructure/] --> |Cross-Cutting| IC[database/, auth/, storage/, events/]
        W[workers/] --> |Background Jobs| WC[analysis/, render/, download/]
    end
  • modules/ - Domain business logic. Each module has its own models, schemas, CRUD, and services.
  • interfaces/ - External interfaces (REST API, could add GraphQL/gRPC).
  • infrastructure/ - Database, auth, storage, events. Shared plumbing.
  • workers/ - Background tasks: transcription, rendering, downloads.
backend/src/
├── modules/              # Domain business logic
│   ├── user/            # Each module has: model, schemas, crud, service
│   ├── project/
│   ├── clip/
│   ├── edit/
│   ├── asset/
│   ├── caption_line/
│   ├── credits/         # Credit balance, transactions, metering
│   ├── entitlement/     # Flexible access control, trial grants
│   ├── payment/         # Stripe checkout, subscription lifecycle
│   ├── subscription/    # Subscription records
│   ├── product/         # Products with entitlement configs
│   ├── price/           # Stripe prices
│   ├── tier/            # Subscription tiers (hobby, creator, viral)
│   ├── discount/        # Coupons, promo codes
│   ├── email/           # Email service, templates, tasks
│   ├── export/
│   ├── feature_flag/     # Feature flags with rollout + allowlist
│   ├── draft/
│   ├── preset/
│   ├── notification/    # In-app notifications (user + system-wide), SSE push
│   ├── analysis_run/
│   ├── auth/            # Auth service, schemas, domain exceptions
│   ├── admin_audit/     # Admin event log + audit trail (who/what/when + before/after diffs)
│   ├── api_keys/        # API key management
│   ├── preview_preset/  # Preview preset configurations
│   ├── rate_limit/      # Rate limit management (per-endpoint, tier-based)
│   ├── support/         # Support conversations (bidirectional, priority auto-assignment)
│   └── common/          # Shared exceptions, utilities, constants
├── interfaces/          # External interfaces
│   ├── api/v1/         # REST API routes
│   └── main.py         # FastAPI app factory
├── infrastructure/      # Cross-cutting concerns
│   ├── database/       # Session, models, migrations
│   ├── auth/           # Authentication, OAuth, sessions, JWT
│   ├── stripe/         # Stripe client, webhooks
│   ├── storage/        # R2/S3 client
│   ├── events/         # SSE pub/sub (project-scoped + user-scoped notification channels)
│   ├── config/         # Settings
│   ├── ai/             # LLM providers (OpenAI, Anthropic)
│   ├── cache/          # Redis/Memcached
│   ├── email/          # Email settings
│   ├── taskiq/         # Background task broker, scheduler
│   ├── rate_limit/     # Redis-backed rate limiting
│   └── logging/        # Structured logging, Logfire
└── workers/            # Background task processing
    ├── analysis/       # Transcription, silence/false-start detection
    ├── render/         # FFmpeg processing
    ├── download/       # yt-dlp, audio extraction
    ├── asset_edit/     # Asset processing
    └── shared/         # Shared worker utilities

Adding a New Module

Follow this order - each step builds on the previous:

flowchart LR
    A[1. Model] --> B[2. Schemas]
    B --> C[3. CRUD]
    C --> D[4. Service]
    D --> E[5. Routes]
    E --> F[6. Tests]

Step 1: Create the Model

Models live in modules/{name}/models.py. Use mixins for common fields:

# modules/widget/models.py
from sqlalchemy import String, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column

from ...infrastructure.database.models import UUIDMixin, TimestampMixin, SoftDeleteMixin
from ...infrastructure.database.session import Base


class Widget(Base, UUIDMixin, TimestampMixin, SoftDeleteMixin):
    """Widget model with UUID primary key and soft deletion."""

    __tablename__ = "widget"

    # Required fields
    name: Mapped[str] = mapped_column(String(100))

    # Optional fields with defaults
    count: Mapped[int] = mapped_column(Integer, default=0)

    # Foreign keys (always indexed)
    user_id: Mapped[int] = mapped_column(
        ForeignKey("user.id", ondelete="CASCADE"),
        index=True,
    )

Mixin Reference:

Mixin Fields Added When to Use
UUIDMixin uuid (PK) Public-facing entities
TimestampMixin created_at, updated_at Almost always
SoftDeleteMixin deleted_at, is_deleted When you need audit trails

Step 2: Create Schemas

Schemas validate input, serialize output, and generate OpenAPI docs. Create variants for different operations:

modules/{name}/schemas.py:

# modules/widget/schemas.py
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from typing import Annotated

from ..common.schemas import TimestampSchema


# Base with shared fields
class WidgetBase(BaseModel):
    name: Annotated[str, Field(min_length=1, max_length=100)]


# For API responses (excludes sensitive data)
class WidgetRead(TimestampSchema, WidgetBase):
    uuid: UUID
    count: int
    user_id: int


# For creating (only user-provided fields)
class WidgetCreate(WidgetBase):
    model_config = ConfigDict(extra="forbid")  # Reject unknown fields


# For updating (all fields optional)
class WidgetUpdate(BaseModel):
    model_config = ConfigDict(extra="forbid")

    name: Annotated[str | None, Field(min_length=1, max_length=100, default=None)]
    count: int | None = None

Schema Naming Convention:

Schema Purpose
WidgetBase Shared fields for inheritance
WidgetRead API responses
WidgetCreate POST request bodies
WidgetCreateInternal Internal creation with hashed passwords, etc.
WidgetUpdate PATCH request bodies (all optional)
WidgetDelete Soft deletion data

Step 3: Create CRUD

We use FastCRUD to implement the repository pattern. It gives us a data access layer that abstracts away raw SQL, providing a clean interface for database operations:

# modules/widget/crud.py
from fastcrud import FastCRUD
from .models import Widget

crud_widgets: FastCRUD = FastCRUD(Widget)

Common CRUD Operations:

# Create
widget = await crud_widgets.create(db=db, object=widget_data, schema_to_select=WidgetRead)

# Read one
widget = await crud_widgets.get(db=db, uuid=uuid, is_deleted=False)

# Read many with pagination
widgets = await crud_widgets.get_multi(
    db=db,
    offset=skip,
    limit=limit,
    schema_to_select=WidgetRead,
    is_deleted=False,
)

# Check existence
exists = await crud_widgets.exists(db=db, name="foo")

# Update
updated = await crud_widgets.update(db=db, object=update_data, uuid=uuid)

# Soft delete
await crud_widgets.delete(db=db, uuid=uuid)

# Hard delete (rarely used)
await crud_widgets.db_delete(db=db, uuid=uuid)

# Joins
result = await crud_widgets.get_joined(
    db=db,
    join_model=User,
    join_prefix="user_",
    schema_to_select=WidgetRead,
    join_schema_to_select=UserRead,
    uuid=uuid,
    nest_joins=True,
)

Query Patterns

FastCRUD's get_multi() defaults to limit=100. This is correct for API endpoints (human pagination) but dangerous for internal queries that need all records. Three cases:

Case 1: A human will scroll this. Paginate. Caller passes limit/offset, capped by MAX_PAGE_LIMIT:

# API endpoint -- caller controls pagination
widgets = await crud_widgets.get_multi(
    db=db, offset=skip, limit=limit,
    schema_to_select=WidgetRead, is_deleted=False,
)

Case 2: Code needs all records. Use limit=None. The WHERE clause is the bound:

# Internal -- domain-bounded query, no pagination
clips = await crud_clips.get_multi_joined(
    db=db, project_id=project_uuid, limit=None,
    schema_to_select=ClipReadBase,
    joins_config=[clip_file_join_config()],
)

limit=None is safe when: (1) the WHERE clause bounds the count, (2) per-record work is cheap, (3) the bound is stable. When (3) doesn't hold (e.g., edits depend on analysis pipeline design), add a sanity cap that raises loudly.

Case 3: Expensive work per record. Job queue. Batch IDs, enqueue tasks:

# Batch processing -- expensive work per record
while True:
    users = await crud_users.get_multi_by_cursor(
        db=db, cursor=cursor, limit=DEFAULT_BATCH_SIZE,
    )
    if not users["data"]:
        break
    await process_batch(users["data"])
    cursor = users["next_cursor"]

Write-side enforcement: Entity count caps belong on creation, not on reads. Example: MAX_CLIPS_PER_PROJECT = 100 is checked in presign_upload() and import_youtube(), then reads use limit=None with confidence. For concurrent safety, wrap count → check → create with advisory_xact_lock(db, f'clips:{project_uuid}') — see §Count-Race Prevention below.

SQL aggregates over get_multi: When computing a scalar (sum, count, max), use SQLAlchemy directly instead of loading rows into Python:

# Correct: SQL aggregate
stmt = select(func.sum(EntitlementTransaction.cost_microcents)).where(...)
result = await db.execute(stmt)

# Wrong: loading rows to sum in Python
result = await crud.get_multi(db=db, ...)
total = sum(row["cost"] for row in result["data"])

Step 4: Create Service

Services contain business logic - validation, permissions, orchestrating multiple CRUD operations:

# modules/widget/service.py
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession

from ..common.exceptions import WidgetNotFoundError, PermissionDeniedError
from .crud import crud_widgets
from .schemas import WidgetCreate, WidgetRead, WidgetUpdate


class WidgetService:
    """Business logic for widget operations."""

    async def create(
        self,
        widget: WidgetCreate,
        user_id: int,
        db: AsyncSession,
    ) -> dict[str, Any]:
        """Create a widget for a user."""
        widget_data = widget.model_dump()
        widget_data["user_id"] = user_id

        return await crud_widgets.create(
            db=db,
            object=widget_data,
            schema_to_select=WidgetRead,
        )

    async def get_by_uuid(
        self,
        uuid: str,
        db: AsyncSession,
    ) -> dict[str, Any]:
        """Get a widget by UUID."""
        widget = await crud_widgets.get(
            db=db,
            schema_to_select=WidgetRead,
            uuid=uuid,
            is_deleted=False,
        )
        if not widget:
            raise WidgetNotFoundError(f"Widget {uuid} not found")
        return widget

    async def verify_ownership(
        self,
        widget: dict[str, Any],
        user_id: int,
    ) -> None:
        """Verify user owns this widget."""
        if widget["user_id"] != user_id:
            raise PermissionDeniedError("You don't own this widget")

Service Patterns:

  • Services are stateless classes
  • Each method takes db: AsyncSession as parameter
  • Raise domain exceptions (not HTTP exceptions)
  • Use CRUD for database operations
  • Handle business validation and permissions

Step 5: Create Routes

Routes go in interfaces/api/v1/{name}.py. Keep them thin - delegate to services, don't put logic here. Global exception handlers catch DomainError and unhandled exceptions automatically, so routes don't need try/except unless a specific exception requires a custom status code:

# interfaces/api/v1/widgets.py
from typing import Annotated, Any
from uuid import UUID

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from ....infrastructure.auth.session.dependencies import get_current_user
from ....infrastructure.database.session import async_session
from ....modules.widget.schemas import WidgetCreate, WidgetRead, WidgetUpdate
from ....modules.widget.service import WidgetService
from ..dependencies import get_widget_service

router = APIRouter(tags=["Widgets"])


@router.post(
    "/",
    status_code=201,
    response_model=WidgetRead,
    summary="Create Widget",
    responses={
        201: {"description": "Widget created"},
        401: {"description": "Not authenticated"},
    },
)
async def create_widget(
    widget: WidgetCreate,
    db: Annotated[AsyncSession, Depends(async_session)],
    current_user: Annotated[dict[str, Any], Depends(get_current_user)],
    widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
    """Create a new widget."""
    return await widget_service.create(widget, current_user["id"], db)


@router.get(
    "/{widget_uuid}",
    response_model=WidgetRead,
    summary="Get Widget",
)
async def get_widget(
    widget_uuid: UUID,
    db: Annotated[AsyncSession, Depends(async_session)],
    widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
    """Get a widget by UUID."""
    return await widget_service.get_by_uuid(str(widget_uuid), db)

Register the router in interfaces/api/v1/__init__.py:

from .widgets import router as widgets_router

router.include_router(widgets_router, prefix="/widgets")

Dependency Injection

FastAPI's Depends() handles dependency injection. Database sessions are per-request. Everything else (services, infrastructure) are singletons created in the lifespan and accessed via request.state (ASGI lifespan state).

flowchart TB
    subgraph Lifespan["Created Once (Lifespan)"]
        SVC[All Services]
        STORAGE[StorageClient]
        EMAIL[PostmarkClient]
        STRIPE[StripeService]
    end

    subgraph Request["Per Request"]
        DB[async_session]
        AUTH[get_current_user]
    end

    Route --> DB
    Route --> SVC
    Route --> AUTH

All singletons are created in app_factory.py's lifespan and yielded as a dict. Dependencies in interfaces/api/dependencies.py read from request.state:

from typing import cast

def get_widget_service(request: Request) -> WidgetService:
    return cast(WidgetService, request.state.widget_service)

def get_storage_client(request: Request) -> StorageClient:
    return cast(StorageClient, request.state.storage_client)

Services with dependencies share singleton instances via constructor injection:

# In lifespan (app_factory.py):
entitlement_service = EntitlementService()
credit_service = CreditService(entitlement_service=entitlement_service)
beta_service = BetaService(entitlement_service=entitlement_service, credit_service=credit_service)
yield {"entitlement_service": entitlement_service, "credit_service": credit_service, ...}

Authentication dependencies:

# Require any authenticated user
current_user: Annotated[dict[str, Any], Depends(get_current_user)]

# Require superuser/admin (narrow meta endpoints only: health, queues, sudo, tier catalog)
_: Annotated[dict[str, Any], Depends(get_current_superuser)]

# Require superuser + audit trail + IP allowlist + session timeout (default for admin endpoints)
audit: Annotated[AdminAuditContext, Depends(get_admin_audit_context)]
# ... then inside handler, after the service call:
await audit.log(AdminEventType.SEARCH, "user", details={"action": "list"})

When to use which: any admin endpoint that returns PII or mutates admin-touchable state should use get_admin_audit_context. The bare get_current_superuser is reserved for narrow meta endpoints where audit noise exceeds value (health checks, queue polling, sudo primitive, tier catalog lookup).

Exception Handling

Services raise domain-specific exceptions (like WidgetNotFoundError), not HTTP exceptions. Two global exception handlers catch these automatically — routes don't need try/except for standard errors.

flowchart LR
    A[Service raises DomainError] --> B[Global DomainError handler]
    B --> C[EXCEPTION_MAPPING → HTTPException]
    C --> D[Sanitized JSON response]
    E[Unhandled Exception] --> F[Global catch-all handler]
    F --> G[Log details server-side]
    G --> H[Generic 500 response]

Global handlers are registered in modules/common/utils/error_handler.py via register_exception_handlers(), called in app_factory.py. Three layers:

  1. RequestValidationError handler: FastAPI validation errors return generic 422 message (no field names leaked)
  2. DomainError handler: each 4xx exception's EXCEPTION_MAPPING lambda decides whether to canned the detail or pass the raw message through. Messages that pass through (ValidationError, PermissionDeniedError, InsufficientCreditsError, UserExistsError) MUST be pre-sanitized by the raising service — the exception class docstrings spell this out. Everything else (ResourceNotFoundError, ResourceExistsError, TierNotFoundError, etc.) returns a canned string. 5xx domain errors always return GENERIC_ERROR_MESSAGE.
  3. CatchAllErrorMiddleware: Unhandled exceptions return generic 500

All error responses include a support_id (8-char UUID) for customer support lookup. Full details logged server-side only. Routes use handle_exception() as a fallback for non-domain errors.

Define domain exceptions in modules/common/exceptions.py:

class DomainError(Exception):
    """Base for all domain errors."""
    pass

class ResourceNotFoundError(DomainError):
    """Resource not found."""
    pass

class WidgetNotFoundError(ResourceNotFoundError):
    """Widget not found."""
    pass

class PermissionDeniedError(DomainError):
    """User lacks permission."""
    pass

Mapping to HTTP in modules/common/constants.py. Each lambda decides whether to return a canned detail or the raw message — exceptions whose message is client-safe (checked against their class docstring) pass the message through so the UI can show actionable text:

EXCEPTION_MAPPING = {
    ResourceNotFoundError: lambda msg: NotFoundException(detail="The requested resource was not found."),
    PermissionDeniedError: lambda msg: ForbiddenException(detail=msg or "You don't have permission for this action."),
    ValidationError: lambda msg: UnprocessableEntityException(detail=msg),
    ResourceExistsError: lambda msg: HTTPException(status_code=409, detail="This resource already exists."),
    InsufficientCreditsError: lambda msg: HTTPException(status_code=402, detail=msg or "Insufficient credits."),
}

Database Patterns

Async Sessions

All database operations are async. The session is injected via Depends(async_session) and auto-commits on success:

from sqlalchemy.ext.asyncio import AsyncSession

async def my_operation(db: AsyncSession):
    # Operations auto-commit on success
    result = await crud_widgets.create(db=db, object=data)
    return result

Count-Race Prevention

count → check → insert is not atomic under concurrent requests. Two requests can both read the same count, both pass the limit check, both insert — the cap is silently exceeded. Two tools close the race, matching different constraint shapes.

Partial unique index — works when the constraint is "exactly one row per X" (e.g., one trial per user). Postgres rejects the second insert at commit time with IntegrityError. The service catches and converts to the domain return value. See entitlement/models.py (uq_user_entitlement_trial) and entitlement/trial.py:grant_trial_if_eligible for the shipped pattern.

Advisory lock — works for any N-bounded cap. Helper lives at infrastructure/database/locks.py:advisory_xact_lock. Example:

from ...infrastructure.database.locks import advisory_xact_lock

async def create_clip(self, project_uuid, ..., db):
    await advisory_xact_lock(db, f"clips:{project_uuid}")
    count = await crud_clips.count(db=db, project_id=project_uuid)
    if count >= MAX_CLIPS_PER_PROJECT:
        raise ValidationError(...)
    await crud_clips.create(db=db, object=..., commit=True)  # commit releases the lock

The lock's transaction is the one that autobegins at db.execute inside the helper. The crud.create(commit=True) that ends the block commits that transaction and releases the lock. Do NOT call await db.commit() between the lock acquire and the create — it releases the lock early.

Why pg_advisory_xact_lock (blocking, transaction-scoped) over pg_try_advisory_xact_lock (non-blocking) or pg_advisory_lock (session-scoped): we want waiters to serialize not error; we want auto-release on commit/rollback so a caller can't leak a lock by forgetting to release.

Key convention: "<entity>:<identifier>" (e.g., "clips:<project_uuid>", "user_projects:<user_id>", "presets:<user_id>", "preview_presets:<user_id>"). Namespaces prevent cross-entity hashtext collisions; per-identifier scoping avoids unrelated-entity serialization. Distinct-but-related entities (e.g., the two preset tables each have independent per-user caps) MUST use distinct keys — colliding them would serialize unrelated operations for no safety benefit. Collision rate is ½^32 per key pair — accepted.

Indexes

Add indexes for: - Foreign keys (always) - Fields used in WHERE clauses - Fields used for sorting

# Single column
email: Mapped[str] = mapped_column(String(50), unique=True, index=True)

# Composite index
__table_args__ = (
    Index("idx_widget_user_status", "user_id", "status"),
)

Relationships

from sqlalchemy.orm import relationship

# One-to-many (parent side)
widgets: Mapped[list["Widget"]] = relationship(
    "Widget",
    back_populates="user",
    lazy="selectin",  # Eager load
    default_factory=list,
    init=False,
)

# Many-to-one (child side)
user: Mapped["User"] = relationship(
    "User",
    back_populates="widgets",
    lazy="selectin",
    init=False,
)

Database connection — direct endpoint, matched region

Two things matter for query latency:

  1. Use the direct Postgres endpoint, NOT the pooler. PgBouncer in transaction mode breaks asyncpg's prepared-statement cache, forcing 3-4× the round trips per query. Our app already has SQLAlchemy connection pooling (POSTGRES_POOL_SIZE=50), so PgBouncer in front of asyncpg is redundant and harmful. The pooler is designed for serverless apps that open one connection per request; we run persistent uvicorn workers.

  2. DB region must match server region. Each query is one network round trip. Cross-region (server in US-East, DB in US-West or vice versa) adds ~50-65ms per query. A single request that runs 4-6 sequential queries pays 200-400ms in DB latency before any other work happens. Match regions (Hetzner Ashburn ↔ Neon us-east-1, Hetzner Hillsboro ↔ Neon us-west-2) for sub-10ms query latency.

Both decisions are made at provision time. Changing the DB region requires creating a new Neon project and migrating data.

Migrations

Generate migrations with Alembic:

cd backend
uv run alembic revision --autogenerate -m "add widget table"
uv run alembic upgrade head

Key Conventions

  1. UUIDs for public APIs - Never expose internal integer IDs
  2. Soft deletes - Use is_deleted=False in queries
  3. Async everything - All I/O operations are async
  4. Type hints - Full typing with Mapped[] and Annotated[]
  5. Domain exceptions - Raise domain errors, map to HTTP in routes
  6. Validation at boundaries - Pydantic validates API input
  7. Use FastCRUD for simple CRUD - For complex queries (joins, aggregations, batch updates), use SQLAlchemy directly
  8. No magic numbers - Extract to constants in module-level constants.py files
  9. No imports inside methods - All imports at module top level
  10. Client IP extraction - Always use modules.common.utils.request_ip.get_client_ip(request) instead of request.client.host directly. The raw transport peer is Caddy's internal Docker IP; the helper reads the X-Forwarded-For header that Caddy populates from Cloudflare's CF-Connecting-IP. Misuse causes per-IP rate limits to track Caddy as the "client" and lock out all users globally.
  11. Client-supplied offsets validated against computed ceilings - Any API field that references a position in a computed timeline (e.g., edit.end_ms into SUM(clip_file.duration_ms), asset_edit.end_ms into user_asset.duration_ms, edit_override.{start_ms, end_ms} into the same project SUM, edit.asset_offset_ms into asset_file.duration_ms) must be bounds-checked at the service layer against the computed ceiling before persisting. The schema Field(ge=0) + end_ms > start_ms validator is necessary but not sufficient — a tampered client can send end_ms=999999999 which schemas accept but renders silently clamp or crash. Three canonical instances ship today: (a) EditService._validate_edit_bounds in edit/service.py uses clip.utils.get_project_total_duration_ms (returns tuple[ProjectDurationStatus, int]; EMPTY rejects, NOT_READY skips, READY enforces) — needed because the ceiling is a SUM across rows not already fetched at the call site; (b) AssetService.validate_edit_bounds in asset/service.py is a pure function taking duration_ms: int | None directly from the caller's require_asset_uploaded return — simpler shape because the ceiling is a single scalar already fetched upstream; © validate_overrides_against_project in draft/utils.py reuses the get_project_total_duration_ms helper to bounds-check every EditOverride.{start_ms, end_ms} in DraftCreate / DraftUpdate / ExportCreate payloads, and EditService.{create, update} inline-checks Edit.asset_offset_ms against asset_file["duration_ms"] threaded from _verify_asset_uploaded (changed to return the joined asset_file dict so callers don't re-query). The pattern to copy is "service-layer bounds validation against a computed ceiling," not a specific helper module — reach for a helper when the check is non-trivial or reused across services (a, c), inline when the value is already in hand and the check is a single comparison (b-subset, c-inline). Zero-ceiling state (EMPTY in the multi-row case) rejects rather than skipping — otherwise an attacker creates the offset first and materializes the ceiling later. start_ms is bounds-checked independently from end_ms when the cross-field validator cannot fire (e.g., EditOverride.end_ms=None case — transitivity doesn't close start_ms ≤ total). Audit verified complete 2026-04-21: every client-facing _ms offset field on a public schema has a service-layer bounds check; worker-internal _ms schemas (workers/render/schemas.py, workers/analysis/**/schemas.py, workers/download/schemas.py, workers/asset_edit/schemas.py) are constructed from server-owned data, not network payloads, and are outside this convention's scope.
  12. Freeze configuration at the queue boundary - Any configuration value that feeds a background worker and depends on user tier (watermark-free, export retention, feature gates) MUST be snapshotted onto the queued record at service-layer kickoff. Workers read the snapshot; resolve_tier_context() must NEVER be called from inside a worker task. Rationale: workers run minutes-to-hours after the request; if tier state changes in that window (subscription expired, upgrade processed), live re-resolution silently produces the wrong answer for whichever end of the transition the user is on. Enforcement: rg "resolve_tier_context" backend/src/workers/ MUST return zero hits. Exception mechanism: a surviving call site that is genuinely observational (logging, metrics, diagnostics — not a policy decision) must carry an inline # noqa: tier-in-worker — justification: <reason> comment so future audits can distinguish deliberate-but-rare exceptions from overlooked regressions. Any unannotated grep hit is a finding. Shipped instances: Fix 9 (expires_at at create, not completion), Fix 11 (watermark_required snapshot, not re-resolved), Fix 12 (analysis mode/settings frozen at kickoff).
  13. Validate against resolved metadata, not just identifier shape - When a client-supplied identifier resolves into a new entity (YouTube URL → ClipFile after fetching video metadata, asset UUID → UserAsset after DB lookup, etc.), downstream resource limits and cap checks MUST run against the resolved entity's properties, not against the identifier alone. Identifier-only validation (URL is well-formed, UUID parses, string length in range) is necessary but not sufficient: a tampered client can send a valid-looking identifier whose resolved properties violate policy. Shipped instances: Fix 4 (Edit bounds against project duration), Fix 16 (AssetEditRequest bounds against asset duration), Fix 5 (YouTube duration cap at import time, after fetch_video_info). Related to Convention #11 but distinct: #11 is about client-supplied _ms offsets already in the request; this convention is about metadata the server fetches to resolve an identifier.
  14. Worker processes require explicit instrumentation setup - TaskIQ worker processes do NOT inherit Logfire auto-instrumentation from the web process. Each worker must call instrument_taskiq(service_name=...) at startup, and configure_broker_lifecycle(broker, service_name, ...) must pass a distinct service.name per queue (sapari-{email,analysis,render,download,proxy,asset-edit}). Enforcement: service_name is a non-default positional-or-keyword argument on configure_broker_lifecycle — adding a broker without it fails at call time (TypeError) rather than silently inheriting a generic name and losing worker-scoped observability. Rationale: worker processes run in separate Python interpreters (different TaskIQ worker pools), so the web process's configure_logfire() call has no effect on them. Without the per-broker call, workers emit zero spans, which silently breaks Logfire dashboards and makes worker-side incidents invisible. Per-instrumentor on/off policy (both web and workers) lives in docs/operations/monitoring.md §Auto-instrumentor defaults: pydantic-ai + SQLAlchemy + FastAPI ON, Redis OFF. The env vars (LOGFIRE_INSTRUMENT_{SQLALCHEMY,REDIS} for web, ..._WORKERS for workers) encode this — don't flip defaults without updating the matrix in that doc.
  15. Manual span naming follows the six-category taxonomy - Every manually emitted logfire.span(...) must fit one of six categories: (a) <worker>.pipeline for DAG invocation parents, (b) step.<step_id> for pipeline steps (emitted automatically by fastroai.LogfireTracer — don't hand-write), © taskiq.<task_name> for task-level parents (emitted automatically by LogfireSpanMiddleware — don't hand-write), (d) <module>.<method> for service-layer operations (credits.reserve, payment.handle_webhook), (e) ext.<service>.<op> for outbound calls (ext.whisper.transcribe, ext.ffmpeg.render, ext.stripe.modify_subscription), (f) sse.<event> for SSE publish paths. Every ext.* span carries a domain correlation key at entry (clip_file_uuid, subscription_id, etc.); every <module>.<method> span carries the primary identifier at entry and an outcome post-attr via span.set_attribute() at exit. Cron tasks add labels={"task_type": "cron"} to their decorator, which the middleware copies onto the span as task_type="cron". Rationale: the taxonomy keeps Logfire filters, saved queries, and ad-hoc grouping legible across the codebase — new spans slot into existing dashboards instead of forcing per-query regex munging. Full table with examples in docs/operations/monitoring.md §Span taxonomy.

Rate Limiting

Rate limits use a RateLimit dataclass defined in infrastructure/auth/constants.py:

from infrastructure.auth.constants import RATE_LIMIT_SIGNUP, RateLimit

# Usage in route handlers:
await _check_rate_limit(request, "signup", RATE_LIMIT_SIGNUP)

Token expiry durations are also in infrastructure/auth/constants.py:

VERIFICATION_TOKEN_EXPIRY_HOURS = 24
RESET_PASSWORD_TOKEN_EXPIRY_HOURS = 1

Credit Architecture

Credits are built on the entitlement system. EntitlementTransaction is the authoritative ledger; quantity_used on UserEntitlement is a rebuildable cache. Every credit event (grant, usage, reset, refund) writes to both atomically.

Ledger Pattern

EntitlementTransaction (authoritative, append-only)
  ├── GRANT   — credits added (purchase, subscription, beta)
  ├── USAGE   — credits consumed (one per entitlement drained)
  ├── RESET   — period boundary marker (RENEWABLE credits, amount=0)
  └── REFUND  — credits returned

UserEntitlement.quantity_used (cache, rebuildable)
  └── Updated in the same db.commit() as the transaction record

rebuild_user_balance() recomputes quantity_used from the ledger at any time. Admin endpoint: POST /credits/admin/rebuild/{user_id}.

Lazy Evaluation

Two things happen lazily inside get_user_balances(), before the balance is computed:

  1. _materialize_ungranted_payments() — Finds payments with status=SUCCEEDED, entitlement_granted=False. Creates credit entitlements via atomic CAS (UPDATE ... WHERE entitlement_granted = false). The webhook is a thin fact-recorder; credits materialize on first balance read.

  2. _reset_expired_renewable_periods() — For RENEWABLE entitlements (beta credits), checks if period_start_at + renewal_period_days < now. If expired, resets quantity_used=0, writes a RESET transaction, clears stale reservations.

Both are self-healing: if the system is down when a webhook fires, the next balance read catches up.

Payment Flow

Stripe webhook fires
  → handle_checkout_session_completed()
    → Creates Payment record (status=SUCCEEDED, entitlement_granted=False)
    → Grants TIER_ACCESS eagerly (user needs dashboard access immediately)
    → Credits are NOT granted here (deferred)
  → Returns 200

User opens dashboard
  → GET /users/me/billing
    → get_user_balances()
      → _materialize_ungranted_payments()  ← credits created here
      → _reset_expired_renewable_periods() ← RENEWABLE reset here
      → Returns balance with credits

Credit Metering

Analysis endpoints check credits before processing:

  1. EstimateProjectService.estimate_analysis_credits() sums clip durations (ceiling to minutes), applies mode multiplier
  2. ReserveCreditService.reserve_credits() stamps reservation metadata (includes reserved_period for RENEWABLE entitlements)
  3. Consume — Worker calls CreditService.use_credits()consume_entitlement() drain loop writes one EntitlementTransaction(USAGE) per entitlement drained, all with commit=False, single db.commit()
  4. Release — On failure, CreditService.release_credits() clears reservation metadata

Consumption Ordering

When credits are consumed, entitlements are drained in priority order:

  1. RENEWABLE first (use-it-or-lose-it — beta credits)
  2. DECREMENTAL by expiry (expiring soonest first, then non-expiring)
  3. ACCUMULATIVE last

This means beta credits are always consumed before subscription credits, and expiring promos are consumed before permanent purchases.

Analysis Modes

Mode is derived from settings (not explicitly selected). AnalysisMode enum in modules/analysis_run/enums.py, multipliers in CREDIT_MULTIPLIERS:

Mode Multiplier Trigger
ai_edit 1.0 Any cut/censorship/director feature enabled
captions_only 0.5 Only language set (transcription + captions)
manual 0.0 Nothing toggled (frontend doesn't call analyze)

The analyze endpoint rejects a captions_only request that carries any ai_edit setting (non-zero pacing_level / false_start_sensitivity, non-empty director_notes, non-none censorship) with a 422 via ProjectService.validate_analysis_mode_consistency. This keeps analysis_mode meaningful for metrics and prevents the "0.5× credits for full pipeline" bypass a tampered client would otherwise get. The worker itself is not mode-gated beyond AI Director asset fetching, so the API-layer validation is the authoritative gate. Mode stored on AnalysisRun for audit.

Beta Entitlement

Beta users get creator-tier access + 240 renewable AI minutes/month. Managed via BetaService in entitlement/beta.py:

from modules.entitlement.beta import BetaService

beta = BetaService()
await beta.grant(user_id, db)       # TIER_ACCESS + RENEWABLE CREDIT_GRANT
await beta.revoke(user_id, db)      # Soft-deletes both by reference_id
await beta.has_access(user_id, db)  # Checks active TIER_ACCESS with BETA reason
await beta.bulk_grant(user_ids, db) # One tier lookup, bulk duplicate check, single commit
await beta.list_users(db)           # All beta users with credit usage (SQL join)

Admin invite flow: POST /admin/beta/invite takes an email. If the user exists, grants beta immediately and sends a welcome email. If not, creates a signed JWT token (24h expiry, purpose=beta_invite) and sends an invite email with a signup link.

The signup link delivers the token via ?beta_invite=TOKEN on the landing URL. Both signup paths consume it: - Email/password signup (POST /users/?beta_invite=TOKEN) verifies the token and grants beta after account creation on email match. - Google OAuth signup — the frontend forwards ?beta_invite=TOKEN to GET /auth/oauth/google?beta_invite=TOKEN; the backend persists it on OAuthState and the callback (/auth/oauth/callback/google) verifies + grants on match. Grant failures are non-fatal to OAuth login (logged, not raised).

Email matching is canonical, not byte-exact. The comparison goes through modules.common.utils.email.canonical_email() on both sides: lowercase entire address, strip +tag aliasing for any domain, strip dots in the local part for @gmail.com and @googlemail.com (other providers treat dots as distinct mailboxes — do not generalize). Without this, an invite to User.Name@gmail.com silently fails to grant beta to username@gmail.com even though Google routes both to the same mailbox.

Beta entitlements stack with subscriptions. resolve_tier_context() picks the highest-ranked tier. Credits from both pools sum. Beta credits are consumed first (RENEWABLE before DECREMENTAL).

Thumbnails

Asset and project thumbnails are generated via FFmpeg (modules/asset/thumbnail.py) and stored in R2 as thumbnail_key. Presigned thumbnail_url is served inline in list responses (not separate endpoints) — generate_download_url() is a local HMAC operation (~1ms, no network call).

  • Assets: Generated on confirm_upload() for video/image types. Center-cropped to 240x240 JPEG.
  • Projects: Generated from first clip in process_clip_artifacts() worker. Only if project has no thumbnail yet.
  • Enrichment: _enrich_with_thumbnail_urls() in AssetService (instance method), _enrich_projects_with_thumbnail_urls() module-level in ProjectService. Called in all list/get/confirm return paths.

Vertical Crop

Optional crop for non-native aspect ratio exports. CropRegion schema (draft/schemas.py) stores normalized 0-1 coordinates. build_crop_filter() (workers/render/ffmpeg/video_filters.py) converts to FFmpeg crop=w:h:x:y filter with even-dimension rounding for H.264 chroma alignment.

Crop is injected in all 3 render paths (trim-only, asset composite, B-roll composite) after setpts and before scale+pad. For B-roll two-pass rendering, crop is applied in Pass 1 (trim) to avoid double-cropping. Source dimensions probed via get_video_dimensions() at render time.

Tier Enforcement

Tier-specific limits are centralized in UserTierContext — a frozen dataclass resolved once per request via resolve_tier_context(). Properties: max_projects, can_use_ai_director, can_watermark_free, can_access_support, storage_quota_mb, export_retention_days.

from modules.entitlement.tier_context import resolve_tier_context

# In API routes (via DI dependency) — the ONLY place resolve_tier_context
# is called. Snapshot the decision onto the queued record if a worker needs it.
tier_ctx = await resolve_tier_context(user_id, db)
if tier_ctx.max_projects <= project_count:
    raise ValidationError("Project limit reached")

# For worker-bound work: snapshot at request time (Convention #12).
export = await crud_exports.create(
    db=db,
    object=ExportCreateInternal(
        ...,
        watermark_required=not tier_ctx.can_watermark_free,  # snapshot
    ),
)

# In workers: read from the snapshot on the queued record, NEVER
# re-resolve tier_ctx. The user's tier may have changed between
# kickoff and execution; the snapshot preserves intent.
if export["watermark_required"]:
    apply_watermark(...)

See Convention #12 for the full rationale + inline # noqa: tier-in-worker exception mechanism for the rare observational case.

Resolution Logic

resolve_tier_context() picks the highest-ranked tier across all active tier entitlements (not the first one). Key behaviors:

  • tier_name: From the highest-ranked tier (free=0, hobby=1, creator=2, viral=3)
  • is_paid: True if ANY entitlement has a paid grant_reason (subscription, purchase, enterprise_contract) — across all entitlements, not just the winner
  • grant_reasons: Frozenset of every active entitlement's grant reason. Callers check set membership (e.g. {"beta"} & tier_ctx.grant_reasons) rather than a scalar winner — the previous scalar field was unreliable on TIER_RANK ties (e.g. TRIAL+BETA both on creator), where max() could pick TRIAL and hide the beta entitlement
  • can_watermark_free: True if is_paid and tier in WATERMARK_FREE_TIERS, OR if grant_reasons intersects WATERMARK_FREE_GRANT_REASONS (includes beta)

This means a beta+hobby user gets creator features (beta wins on tier rank), is_paid=True (from hobby), and watermark-free (beta in grant_reasons).

Constants in modules/entitlement/constants.py: TIER_MAX_PROJECTS, TIER_STORAGE_MB, TIER_EXPORT_RETENTION_DAYS, AI_DIRECTOR_TIERS, TIER_RANK, PAID_GRANT_REASONS, WATERMARK_FREE_GRANT_REASONS.

Unit conversions use BYTES_PER_MB from modules/common/constants.py (e.g., quota_bytes = tier_ctx.storage_quota_mb * BYTES_PER_MB).

Priority Queue

Task brokers use RabbitMQ with native priority queues. User-facing tasks (analysis, render, asset edit) are enqueued with a priority derived from the user's subscription tier. Non-user-facing tasks (email, download) use plain FIFO.

Priority Mapping

Tier is_paid Priority Constant
viral true 3 TaskPriority.HIGH
creator true 2 TaskPriority.NORMAL
hobby true 1 TaskPriority.LOW
trial / free / none false 0 TaskPriority.BACKGROUND

Enqueue Pattern

from infrastructure.taskiq.priority import resolve_task_priority
from modules.entitlement.tier_context import resolve_tier_context

tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
    project_uuid=str(project_uuid),
)

Priority is resolved at enqueue time — if a user upgrades, their next task gets the new priority. Tasks already in the queue keep their original priority.

Queue Configuration

Priority queues use QueueType.CLASSIC with max_priority=3 and qos=2 (low prefetch for strict ordering). Brokers and constants are in infrastructure/taskiq/brokers.py and infrastructure/taskiq/priority.py.

Feature Flags

Postgres-backed feature flags for kill switches, gradual rollouts, beta testing, and time-limited features. No external dependencies.

Models

FeatureFlag — the flag itself (key, enabled, rollout_percentage, description, expires_at). FeatureFlagUser — explicit allowlist (user always gets the flag).

Evaluation Order

  1. enabled=Falseoff (kill switch — overrides everything)
  2. User in allowlist → on (bypasses expiry and percentage)
  3. expires_at in the past → off
  4. rollout_percentage set → deterministic hash check
  5. rollout_percentage null → on (everyone)

Route Gating

Use require_feature() dependency to gate endpoints behind flags. Returns 404 (not 403) to hide existence:

from modules.feature_flag.dependencies import require_feature

@router.post("/new-endpoint")
async def new_endpoint(
    ...,
    _: Annotated[None, Depends(require_feature("my_flag"))],
):
    # Only reachable when flag is on for this user

Frontend

useFeatureFlag("key") returns a boolean backed by GET /api/v1/flags (cached 5 minutes). The backend is the authority — even if someone inspects the response, the route dependency blocks access.

Admin Endpoints

GET/POST/PATCH/DELETE /api/v1/admin/flags + allowlist management via /admin/flags/{id}/users. Superuser only.

Scripts

Script Purpose How to run
scripts/setup_initial_data.py Create tables + seed tiers Auto-runs on docker compose up
scripts/seed_all.py Tables + tiers + Stripe products (full setup) Auto-runs on docker compose up
scripts/seed_stripe_products.py Stripe products + prices only Called by seed_all.py if STRIPE_SECRET_KEY is set
scripts/seed_trial_credits.py Grant trial credits to a user uv run python scripts/seed_trial_credits.py --email user@example.com

Key Files

Purpose Location
Database session infrastructure/database/session.py
Model mixins infrastructure/database/models.py
Auth dependencies infrastructure/auth/session/dependencies.py
Auth constants infrastructure/auth/constants.py
Common schemas modules/common/schemas.py
Domain exceptions modules/common/exceptions.py
Error handler modules/common/utils/error_handler.py
API dependencies interfaces/api/dependencies.py
Route registration interfaces/api/v1/__init__.py
Credit service + ledger modules/credits/service.py
Entitlement transaction model modules/credits/models.py
Trial granting modules/entitlement/trial.py
Beta service modules/entitlement/beta.py
Beta admin API interfaces/api/v1/beta.py
Tier context resolution modules/entitlement/tier_context.py
Entitlement constants modules/entitlement/constants.py
Broker setup infrastructure/taskiq/brokers.py
Priority routing infrastructure/taskiq/priority.py
Feature flags modules/feature_flag/service.py
Feature flag dependency modules/feature_flag/dependencies.py

← Getting Started Frontend Development →