Backend Development Guide¶
This guide covers patterns and conventions for developing in the Sapari backend.
Project Structure¶
The backend follows a layered architecture: business logic in modules, API routes in interfaces, shared infrastructure, and background workers.
flowchart TB
subgraph src["backend/src/"]
M[modules/] --> |Business Logic| MC[user/, clip/, project/, ...]
I[interfaces/] --> |API Layer| IA[api/v1/]
IN[infrastructure/] --> |Cross-Cutting| IC[database/, auth/, storage/, events/]
W[workers/] --> |Background Jobs| WC[analysis/, render/, download/]
end
- modules/ - Domain business logic. Each module has its own models, schemas, CRUD, and services.
- interfaces/ - External interfaces (REST API, could add GraphQL/gRPC).
- infrastructure/ - Database, auth, storage, events. Shared plumbing.
- workers/ - Background tasks: transcription, rendering, downloads.
backend/src/
├── modules/ # Domain business logic
│ ├── user/ # Each module has: model, schemas, crud, service
│ ├── project/
│ ├── clip/
│ ├── edit/
│ ├── asset/
│ ├── caption_line/
│ ├── credits/ # Credit balance, transactions, metering
│ ├── entitlement/ # Flexible access control, trial grants
│ ├── payment/ # Stripe checkout, subscription lifecycle
│ ├── subscription/ # Subscription records
│ ├── product/ # Products with entitlement configs
│ ├── price/ # Stripe prices
│ ├── tier/ # Subscription tiers (hobby, creator, viral)
│ ├── discount/ # Coupons, promo codes
│ ├── email/ # Email service, templates, tasks
│ ├── export/
│ ├── feature_flag/ # Feature flags with rollout + allowlist
│ ├── draft/
│ ├── preset/
│ ├── notification/ # In-app notifications (user + system-wide), SSE push
│ ├── analysis_run/
│ ├── auth/ # Auth service, schemas, domain exceptions
│ ├── admin_audit/ # Admin event log + audit trail (who/what/when + before/after diffs)
│ ├── api_keys/ # API key management
│ ├── preview_preset/ # Preview preset configurations
│ ├── rate_limit/ # Rate limit management (per-endpoint, tier-based)
│ ├── support/ # Support conversations (bidirectional, priority auto-assignment)
│ └── common/ # Shared exceptions, utilities, constants
├── interfaces/ # External interfaces
│ ├── api/v1/ # REST API routes
│ └── main.py # FastAPI app factory
├── infrastructure/ # Cross-cutting concerns
│ ├── database/ # Session, models, migrations
│ ├── auth/ # Authentication, OAuth, sessions, JWT
│ ├── stripe/ # Stripe client, webhooks
│ ├── storage/ # R2/S3 client
│ ├── events/ # SSE pub/sub (project-scoped + user-scoped notification channels)
│ ├── config/ # Settings
│ ├── ai/ # LLM providers (OpenAI, Anthropic)
│ ├── cache/ # Redis/Memcached
│ ├── email/ # Email settings
│ ├── taskiq/ # Background task broker, scheduler
│ ├── rate_limit/ # Redis-backed rate limiting
│ └── logging/ # Structured logging, Logfire
└── workers/ # Background task processing
├── analysis/ # Transcription, silence/false-start detection
├── render/ # FFmpeg processing
├── download/ # yt-dlp, audio extraction
├── asset_edit/ # Asset processing
└── shared/ # Shared worker utilities
Adding a New Module¶
Follow this order - each step builds on the previous:
flowchart LR
A[1. Model] --> B[2. Schemas]
B --> C[3. CRUD]
C --> D[4. Service]
D --> E[5. Routes]
E --> F[6. Tests]
Step 1: Create the Model¶
Models live in modules/{name}/models.py. Use mixins for common fields:
# modules/widget/models.py
from sqlalchemy import String, Integer, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column
from ...infrastructure.database.models import UUIDMixin, TimestampMixin, SoftDeleteMixin
from ...infrastructure.database.session import Base
class Widget(Base, UUIDMixin, TimestampMixin, SoftDeleteMixin):
"""Widget model with UUID primary key and soft deletion."""
__tablename__ = "widget"
# Required fields
name: Mapped[str] = mapped_column(String(100))
# Optional fields with defaults
count: Mapped[int] = mapped_column(Integer, default=0)
# Foreign keys (always indexed)
user_id: Mapped[int] = mapped_column(
ForeignKey("user.id", ondelete="CASCADE"),
index=True,
)
Mixin Reference:
| Mixin | Fields Added | When to Use |
|---|---|---|
UUIDMixin |
uuid (PK) |
Public-facing entities |
TimestampMixin |
created_at, updated_at |
Almost always |
SoftDeleteMixin |
deleted_at, is_deleted |
When you need audit trails |
Step 2: Create Schemas¶
Schemas validate input, serialize output, and generate OpenAPI docs. Create variants for different operations:
modules/{name}/schemas.py:
# modules/widget/schemas.py
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from typing import Annotated
from ..common.schemas import TimestampSchema
# Base with shared fields
class WidgetBase(BaseModel):
name: Annotated[str, Field(min_length=1, max_length=100)]
# For API responses (excludes sensitive data)
class WidgetRead(TimestampSchema, WidgetBase):
uuid: UUID
count: int
user_id: int
# For creating (only user-provided fields)
class WidgetCreate(WidgetBase):
model_config = ConfigDict(extra="forbid") # Reject unknown fields
# For updating (all fields optional)
class WidgetUpdate(BaseModel):
model_config = ConfigDict(extra="forbid")
name: Annotated[str | None, Field(min_length=1, max_length=100, default=None)]
count: int | None = None
Schema Naming Convention:
| Schema | Purpose |
|---|---|
WidgetBase |
Shared fields for inheritance |
WidgetRead |
API responses |
WidgetCreate |
POST request bodies |
WidgetCreateInternal |
Internal creation with hashed passwords, etc. |
WidgetUpdate |
PATCH request bodies (all optional) |
WidgetDelete |
Soft deletion data |
Step 3: Create CRUD¶
We use FastCRUD to implement the repository pattern. It gives us a data access layer that abstracts away raw SQL, providing a clean interface for database operations:
# modules/widget/crud.py
from fastcrud import FastCRUD
from .models import Widget
crud_widgets: FastCRUD = FastCRUD(Widget)
Common CRUD Operations:
# Create
widget = await crud_widgets.create(db=db, object=widget_data, schema_to_select=WidgetRead)
# Read one
widget = await crud_widgets.get(db=db, uuid=uuid, is_deleted=False)
# Read many with pagination
widgets = await crud_widgets.get_multi(
db=db,
offset=skip,
limit=limit,
schema_to_select=WidgetRead,
is_deleted=False,
)
# Check existence
exists = await crud_widgets.exists(db=db, name="foo")
# Update
updated = await crud_widgets.update(db=db, object=update_data, uuid=uuid)
# Soft delete
await crud_widgets.delete(db=db, uuid=uuid)
# Hard delete (rarely used)
await crud_widgets.db_delete(db=db, uuid=uuid)
# Joins
result = await crud_widgets.get_joined(
db=db,
join_model=User,
join_prefix="user_",
schema_to_select=WidgetRead,
join_schema_to_select=UserRead,
uuid=uuid,
nest_joins=True,
)
Query Patterns¶
FastCRUD's get_multi() defaults to limit=100. This is correct for API endpoints (human pagination) but dangerous for internal queries that need all records. Three cases:
Case 1: A human will scroll this. Paginate. Caller passes limit/offset, capped by MAX_PAGE_LIMIT:
# API endpoint -- caller controls pagination
widgets = await crud_widgets.get_multi(
db=db, offset=skip, limit=limit,
schema_to_select=WidgetRead, is_deleted=False,
)
Case 2: Code needs all records. Use limit=None. The WHERE clause is the bound:
# Internal -- domain-bounded query, no pagination
clips = await crud_clips.get_multi_joined(
db=db, project_id=project_uuid, limit=None,
schema_to_select=ClipReadBase,
joins_config=[clip_file_join_config()],
)
limit=None is safe when: (1) the WHERE clause bounds the count, (2) per-record work is cheap, (3) the bound is stable. When (3) doesn't hold (e.g., edits depend on analysis pipeline design), add a sanity cap that raises loudly.
Case 3: Expensive work per record. Job queue. Batch IDs, enqueue tasks:
# Batch processing -- expensive work per record
while True:
users = await crud_users.get_multi_by_cursor(
db=db, cursor=cursor, limit=DEFAULT_BATCH_SIZE,
)
if not users["data"]:
break
await process_batch(users["data"])
cursor = users["next_cursor"]
Write-side enforcement: Entity count caps belong on creation, not on reads. Example: MAX_CLIPS_PER_PROJECT = 100 is checked in presign_upload() and import_youtube(), then reads use limit=None with confidence. For concurrent safety, wrap count → check → create with advisory_xact_lock(db, f'clips:{project_uuid}') — see §Count-Race Prevention below.
SQL aggregates over get_multi: When computing a scalar (sum, count, max), use SQLAlchemy directly instead of loading rows into Python:
# Correct: SQL aggregate
stmt = select(func.sum(EntitlementTransaction.cost_microcents)).where(...)
result = await db.execute(stmt)
# Wrong: loading rows to sum in Python
result = await crud.get_multi(db=db, ...)
total = sum(row["cost"] for row in result["data"])
Step 4: Create Service¶
Services contain business logic - validation, permissions, orchestrating multiple CRUD operations:
# modules/widget/service.py
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from ..common.exceptions import WidgetNotFoundError, PermissionDeniedError
from .crud import crud_widgets
from .schemas import WidgetCreate, WidgetRead, WidgetUpdate
class WidgetService:
"""Business logic for widget operations."""
async def create(
self,
widget: WidgetCreate,
user_id: int,
db: AsyncSession,
) -> dict[str, Any]:
"""Create a widget for a user."""
widget_data = widget.model_dump()
widget_data["user_id"] = user_id
return await crud_widgets.create(
db=db,
object=widget_data,
schema_to_select=WidgetRead,
)
async def get_by_uuid(
self,
uuid: str,
db: AsyncSession,
) -> dict[str, Any]:
"""Get a widget by UUID."""
widget = await crud_widgets.get(
db=db,
schema_to_select=WidgetRead,
uuid=uuid,
is_deleted=False,
)
if not widget:
raise WidgetNotFoundError(f"Widget {uuid} not found")
return widget
async def verify_ownership(
self,
widget: dict[str, Any],
user_id: int,
) -> None:
"""Verify user owns this widget."""
if widget["user_id"] != user_id:
raise PermissionDeniedError("You don't own this widget")
Service Patterns:
- Services are stateless classes
- Each method takes
db: AsyncSessionas parameter - Raise domain exceptions (not HTTP exceptions)
- Use CRUD for database operations
- Handle business validation and permissions
Step 5: Create Routes¶
Routes go in interfaces/api/v1/{name}.py. Keep them thin - delegate to services, don't put logic here. Global exception handlers catch DomainError and unhandled exceptions automatically, so routes don't need try/except unless a specific exception requires a custom status code:
# interfaces/api/v1/widgets.py
from typing import Annotated, Any
from uuid import UUID
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from ....infrastructure.auth.session.dependencies import get_current_user
from ....infrastructure.database.session import async_session
from ....modules.widget.schemas import WidgetCreate, WidgetRead, WidgetUpdate
from ....modules.widget.service import WidgetService
from ..dependencies import get_widget_service
router = APIRouter(tags=["Widgets"])
@router.post(
"/",
status_code=201,
response_model=WidgetRead,
summary="Create Widget",
responses={
201: {"description": "Widget created"},
401: {"description": "Not authenticated"},
},
)
async def create_widget(
widget: WidgetCreate,
db: Annotated[AsyncSession, Depends(async_session)],
current_user: Annotated[dict[str, Any], Depends(get_current_user)],
widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
"""Create a new widget."""
return await widget_service.create(widget, current_user["id"], db)
@router.get(
"/{widget_uuid}",
response_model=WidgetRead,
summary="Get Widget",
)
async def get_widget(
widget_uuid: UUID,
db: Annotated[AsyncSession, Depends(async_session)],
widget_service: Annotated[WidgetService, Depends(get_widget_service)],
) -> dict[str, Any]:
"""Get a widget by UUID."""
return await widget_service.get_by_uuid(str(widget_uuid), db)
Register the router in interfaces/api/v1/__init__.py:
from .widgets import router as widgets_router
router.include_router(widgets_router, prefix="/widgets")
Dependency Injection¶
FastAPI's Depends() handles dependency injection. Database sessions are per-request. Everything else (services, infrastructure) are singletons created in the lifespan and accessed via request.state (ASGI lifespan state).
flowchart TB
subgraph Lifespan["Created Once (Lifespan)"]
SVC[All Services]
STORAGE[StorageClient]
EMAIL[PostmarkClient]
STRIPE[StripeService]
end
subgraph Request["Per Request"]
DB[async_session]
AUTH[get_current_user]
end
Route --> DB
Route --> SVC
Route --> AUTH
All singletons are created in app_factory.py's lifespan and yielded as a dict. Dependencies in interfaces/api/dependencies.py read from request.state:
from typing import cast
def get_widget_service(request: Request) -> WidgetService:
return cast(WidgetService, request.state.widget_service)
def get_storage_client(request: Request) -> StorageClient:
return cast(StorageClient, request.state.storage_client)
Services with dependencies share singleton instances via constructor injection:
# In lifespan (app_factory.py):
entitlement_service = EntitlementService()
credit_service = CreditService(entitlement_service=entitlement_service)
beta_service = BetaService(entitlement_service=entitlement_service, credit_service=credit_service)
yield {"entitlement_service": entitlement_service, "credit_service": credit_service, ...}
Authentication dependencies:
# Require any authenticated user
current_user: Annotated[dict[str, Any], Depends(get_current_user)]
# Require superuser/admin (narrow meta endpoints only: health, queues, sudo, tier catalog)
_: Annotated[dict[str, Any], Depends(get_current_superuser)]
# Require superuser + audit trail + IP allowlist + session timeout (default for admin endpoints)
audit: Annotated[AdminAuditContext, Depends(get_admin_audit_context)]
# ... then inside handler, after the service call:
await audit.log(AdminEventType.SEARCH, "user", details={"action": "list"})
When to use which: any admin endpoint that returns PII or mutates admin-touchable state should use get_admin_audit_context. The bare get_current_superuser is reserved for narrow meta endpoints where audit noise exceeds value (health checks, queue polling, sudo primitive, tier catalog lookup).
Exception Handling¶
Services raise domain-specific exceptions (like WidgetNotFoundError), not HTTP exceptions. Two global exception handlers catch these automatically — routes don't need try/except for standard errors.
flowchart LR
A[Service raises DomainError] --> B[Global DomainError handler]
B --> C[EXCEPTION_MAPPING → HTTPException]
C --> D[Sanitized JSON response]
E[Unhandled Exception] --> F[Global catch-all handler]
F --> G[Log details server-side]
G --> H[Generic 500 response]
Global handlers are registered in modules/common/utils/error_handler.py via register_exception_handlers(), called in app_factory.py. Three layers:
RequestValidationErrorhandler: FastAPI validation errors return generic 422 message (no field names leaked)DomainErrorhandler: each 4xx exception'sEXCEPTION_MAPPINGlambda decides whether to canned the detail or pass the raw message through. Messages that pass through (ValidationError,PermissionDeniedError,InsufficientCreditsError,UserExistsError) MUST be pre-sanitized by the raising service — the exception class docstrings spell this out. Everything else (ResourceNotFoundError,ResourceExistsError,TierNotFoundError, etc.) returns a canned string. 5xx domain errors always returnGENERIC_ERROR_MESSAGE.CatchAllErrorMiddleware: Unhandled exceptions return generic 500
All error responses include a support_id (8-char UUID) for customer support lookup. Full details logged server-side only. Routes use handle_exception() as a fallback for non-domain errors.
Define domain exceptions in modules/common/exceptions.py:
class DomainError(Exception):
"""Base for all domain errors."""
pass
class ResourceNotFoundError(DomainError):
"""Resource not found."""
pass
class WidgetNotFoundError(ResourceNotFoundError):
"""Widget not found."""
pass
class PermissionDeniedError(DomainError):
"""User lacks permission."""
pass
Mapping to HTTP in modules/common/constants.py. Each lambda decides whether to return a canned detail or the raw message — exceptions whose message is client-safe (checked against their class docstring) pass the message through so the UI can show actionable text:
EXCEPTION_MAPPING = {
ResourceNotFoundError: lambda msg: NotFoundException(detail="The requested resource was not found."),
PermissionDeniedError: lambda msg: ForbiddenException(detail=msg or "You don't have permission for this action."),
ValidationError: lambda msg: UnprocessableEntityException(detail=msg),
ResourceExistsError: lambda msg: HTTPException(status_code=409, detail="This resource already exists."),
InsufficientCreditsError: lambda msg: HTTPException(status_code=402, detail=msg or "Insufficient credits."),
}
Database Patterns¶
Async Sessions¶
All database operations are async. The session is injected via Depends(async_session) and auto-commits on success:
from sqlalchemy.ext.asyncio import AsyncSession
async def my_operation(db: AsyncSession):
# Operations auto-commit on success
result = await crud_widgets.create(db=db, object=data)
return result
Count-Race Prevention¶
count → check → insert is not atomic under concurrent requests. Two requests can both read the same count, both pass the limit check, both insert — the cap is silently exceeded. Two tools close the race, matching different constraint shapes.
Partial unique index — works when the constraint is "exactly one row per X" (e.g., one trial per user). Postgres rejects the second insert at commit time with IntegrityError. The service catches and converts to the domain return value. See entitlement/models.py (uq_user_entitlement_trial) and entitlement/trial.py:grant_trial_if_eligible for the shipped pattern.
Advisory lock — works for any N-bounded cap. Helper lives at infrastructure/database/locks.py:advisory_xact_lock. Example:
from ...infrastructure.database.locks import advisory_xact_lock
async def create_clip(self, project_uuid, ..., db):
await advisory_xact_lock(db, f"clips:{project_uuid}")
count = await crud_clips.count(db=db, project_id=project_uuid)
if count >= MAX_CLIPS_PER_PROJECT:
raise ValidationError(...)
await crud_clips.create(db=db, object=..., commit=True) # commit releases the lock
The lock's transaction is the one that autobegins at db.execute inside the helper. The crud.create(commit=True) that ends the block commits that transaction and releases the lock. Do NOT call await db.commit() between the lock acquire and the create — it releases the lock early.
Why pg_advisory_xact_lock (blocking, transaction-scoped) over pg_try_advisory_xact_lock (non-blocking) or pg_advisory_lock (session-scoped): we want waiters to serialize not error; we want auto-release on commit/rollback so a caller can't leak a lock by forgetting to release.
Key convention: "<entity>:<identifier>" (e.g., "clips:<project_uuid>", "user_projects:<user_id>", "presets:<user_id>", "preview_presets:<user_id>"). Namespaces prevent cross-entity hashtext collisions; per-identifier scoping avoids unrelated-entity serialization. Distinct-but-related entities (e.g., the two preset tables each have independent per-user caps) MUST use distinct keys — colliding them would serialize unrelated operations for no safety benefit. Collision rate is ½^32 per key pair — accepted.
Indexes¶
Add indexes for: - Foreign keys (always) - Fields used in WHERE clauses - Fields used for sorting
# Single column
email: Mapped[str] = mapped_column(String(50), unique=True, index=True)
# Composite index
__table_args__ = (
Index("idx_widget_user_status", "user_id", "status"),
)
Relationships¶
from sqlalchemy.orm import relationship
# One-to-many (parent side)
widgets: Mapped[list["Widget"]] = relationship(
"Widget",
back_populates="user",
lazy="selectin", # Eager load
default_factory=list,
init=False,
)
# Many-to-one (child side)
user: Mapped["User"] = relationship(
"User",
back_populates="widgets",
lazy="selectin",
init=False,
)
Database connection — direct endpoint, matched region¶
Two things matter for query latency:
-
Use the direct Postgres endpoint, NOT the pooler. PgBouncer in transaction mode breaks asyncpg's prepared-statement cache, forcing 3-4× the round trips per query. Our app already has SQLAlchemy connection pooling (
POSTGRES_POOL_SIZE=50), so PgBouncer in front of asyncpg is redundant and harmful. The pooler is designed for serverless apps that open one connection per request; we run persistent uvicorn workers. -
DB region must match server region. Each query is one network round trip. Cross-region (server in US-East, DB in US-West or vice versa) adds ~50-65ms per query. A single request that runs 4-6 sequential queries pays 200-400ms in DB latency before any other work happens. Match regions (Hetzner Ashburn ↔ Neon
us-east-1, Hetzner Hillsboro ↔ Neonus-west-2) for sub-10ms query latency.
Both decisions are made at provision time. Changing the DB region requires creating a new Neon project and migrating data.
Migrations¶
Generate migrations with Alembic:
Key Conventions¶
- UUIDs for public APIs - Never expose internal integer IDs
- Soft deletes - Use
is_deleted=Falsein queries - Async everything - All I/O operations are async
- Type hints - Full typing with
Mapped[]andAnnotated[] - Domain exceptions - Raise domain errors, map to HTTP in routes
- Validation at boundaries - Pydantic validates API input
- Use FastCRUD for simple CRUD - For complex queries (joins, aggregations, batch updates), use SQLAlchemy directly
- No magic numbers - Extract to constants in module-level
constants.pyfiles - No imports inside methods - All imports at module top level
- Client IP extraction - Always use
modules.common.utils.request_ip.get_client_ip(request)instead ofrequest.client.hostdirectly. The raw transport peer is Caddy's internal Docker IP; the helper reads the X-Forwarded-For header that Caddy populates from Cloudflare'sCF-Connecting-IP. Misuse causes per-IP rate limits to track Caddy as the "client" and lock out all users globally. - Client-supplied offsets validated against computed ceilings - Any API field that references a position in a computed timeline (e.g.,
edit.end_msintoSUM(clip_file.duration_ms),asset_edit.end_msintouser_asset.duration_ms,edit_override.{start_ms, end_ms}into the same project SUM,edit.asset_offset_msintoasset_file.duration_ms) must be bounds-checked at the service layer against the computed ceiling before persisting. The schemaField(ge=0)+end_ms > start_msvalidator is necessary but not sufficient — a tampered client can sendend_ms=999999999which schemas accept but renders silently clamp or crash. Three canonical instances ship today: (a)EditService._validate_edit_boundsinedit/service.pyusesclip.utils.get_project_total_duration_ms(returnstuple[ProjectDurationStatus, int];EMPTYrejects,NOT_READYskips,READYenforces) — needed because the ceiling is a SUM across rows not already fetched at the call site; (b)AssetService.validate_edit_boundsinasset/service.pyis a pure function takingduration_ms: int | Nonedirectly from the caller'srequire_asset_uploadedreturn — simpler shape because the ceiling is a single scalar already fetched upstream; ©validate_overrides_against_projectindraft/utils.pyreuses theget_project_total_duration_mshelper to bounds-check everyEditOverride.{start_ms, end_ms}inDraftCreate/DraftUpdate/ExportCreatepayloads, andEditService.{create, update}inline-checksEdit.asset_offset_msagainstasset_file["duration_ms"]threaded from_verify_asset_uploaded(changed to return the joined asset_file dict so callers don't re-query). The pattern to copy is "service-layer bounds validation against a computed ceiling," not a specific helper module — reach for a helper when the check is non-trivial or reused across services (a, c), inline when the value is already in hand and the check is a single comparison (b-subset, c-inline). Zero-ceiling state (EMPTYin the multi-row case) rejects rather than skipping — otherwise an attacker creates the offset first and materializes the ceiling later.start_msis bounds-checked independently fromend_mswhen the cross-field validator cannot fire (e.g.,EditOverride.end_ms=Nonecase — transitivity doesn't closestart_ms ≤ total). Audit verified complete 2026-04-21: every client-facing_msoffset field on a public schema has a service-layer bounds check; worker-internal_msschemas (workers/render/schemas.py,workers/analysis/**/schemas.py,workers/download/schemas.py,workers/asset_edit/schemas.py) are constructed from server-owned data, not network payloads, and are outside this convention's scope. - Freeze configuration at the queue boundary - Any configuration value that feeds a background worker and depends on user tier (watermark-free, export retention, feature gates) MUST be snapshotted onto the queued record at service-layer kickoff. Workers read the snapshot;
resolve_tier_context()must NEVER be called from inside a worker task. Rationale: workers run minutes-to-hours after the request; if tier state changes in that window (subscription expired, upgrade processed), live re-resolution silently produces the wrong answer for whichever end of the transition the user is on. Enforcement:rg "resolve_tier_context" backend/src/workers/MUST return zero hits. Exception mechanism: a surviving call site that is genuinely observational (logging, metrics, diagnostics — not a policy decision) must carry an inline# noqa: tier-in-worker — justification: <reason>comment so future audits can distinguish deliberate-but-rare exceptions from overlooked regressions. Any unannotated grep hit is a finding. Shipped instances: Fix 9 (expires_atat create, not completion), Fix 11 (watermark_requiredsnapshot, not re-resolved), Fix 12 (analysis mode/settings frozen at kickoff). - Validate against resolved metadata, not just identifier shape - When a client-supplied identifier resolves into a new entity (YouTube URL → ClipFile after fetching video metadata, asset UUID → UserAsset after DB lookup, etc.), downstream resource limits and cap checks MUST run against the resolved entity's properties, not against the identifier alone. Identifier-only validation (URL is well-formed, UUID parses, string length in range) is necessary but not sufficient: a tampered client can send a valid-looking identifier whose resolved properties violate policy. Shipped instances: Fix 4 (Edit bounds against project duration), Fix 16 (AssetEditRequest bounds against asset duration), Fix 5 (YouTube duration cap at import time, after
fetch_video_info). Related to Convention #11 but distinct: #11 is about client-supplied _ms offsets already in the request; this convention is about metadata the server fetches to resolve an identifier. - Worker processes require explicit instrumentation setup - TaskIQ worker processes do NOT inherit Logfire auto-instrumentation from the web process. Each worker must call
instrument_taskiq(service_name=...)at startup, andconfigure_broker_lifecycle(broker, service_name, ...)must pass a distinctservice.nameper queue (sapari-{email,analysis,render,download,proxy,asset-edit}). Enforcement:service_nameis a non-default positional-or-keyword argument onconfigure_broker_lifecycle— adding a broker without it fails at call time (TypeError) rather than silently inheriting a generic name and losing worker-scoped observability. Rationale: worker processes run in separate Python interpreters (different TaskIQ worker pools), so the web process'sconfigure_logfire()call has no effect on them. Without the per-broker call, workers emit zero spans, which silently breaks Logfire dashboards and makes worker-side incidents invisible. Per-instrumentor on/off policy (both web and workers) lives indocs/operations/monitoring.md§Auto-instrumentor defaults: pydantic-ai + SQLAlchemy + FastAPI ON, Redis OFF. The env vars (LOGFIRE_INSTRUMENT_{SQLALCHEMY,REDIS}for web,..._WORKERSfor workers) encode this — don't flip defaults without updating the matrix in that doc. - Manual span naming follows the six-category taxonomy - Every manually emitted
logfire.span(...)must fit one of six categories: (a)<worker>.pipelinefor DAG invocation parents, (b)step.<step_id>for pipeline steps (emitted automatically byfastroai.LogfireTracer— don't hand-write), ©taskiq.<task_name>for task-level parents (emitted automatically byLogfireSpanMiddleware— don't hand-write), (d)<module>.<method>for service-layer operations (credits.reserve,payment.handle_webhook), (e)ext.<service>.<op>for outbound calls (ext.whisper.transcribe,ext.ffmpeg.render,ext.stripe.modify_subscription), (f)sse.<event>for SSE publish paths. Everyext.*span carries a domain correlation key at entry (clip_file_uuid,subscription_id, etc.); every<module>.<method>span carries the primary identifier at entry and an outcome post-attr viaspan.set_attribute()at exit. Cron tasks addlabels={"task_type": "cron"}to their decorator, which the middleware copies onto the span astask_type="cron". Rationale: the taxonomy keeps Logfire filters, saved queries, and ad-hoc grouping legible across the codebase — new spans slot into existing dashboards instead of forcing per-query regex munging. Full table with examples indocs/operations/monitoring.md§Span taxonomy.
Rate Limiting¶
Rate limits use a RateLimit dataclass defined in infrastructure/auth/constants.py:
from infrastructure.auth.constants import RATE_LIMIT_SIGNUP, RateLimit
# Usage in route handlers:
await _check_rate_limit(request, "signup", RATE_LIMIT_SIGNUP)
Token expiry durations are also in infrastructure/auth/constants.py:
Credit Architecture¶
Credits are built on the entitlement system. EntitlementTransaction is the authoritative ledger; quantity_used on UserEntitlement is a rebuildable cache. Every credit event (grant, usage, reset, refund) writes to both atomically.
Ledger Pattern¶
EntitlementTransaction (authoritative, append-only)
├── GRANT — credits added (purchase, subscription, beta)
├── USAGE — credits consumed (one per entitlement drained)
├── RESET — period boundary marker (RENEWABLE credits, amount=0)
└── REFUND — credits returned
UserEntitlement.quantity_used (cache, rebuildable)
└── Updated in the same db.commit() as the transaction record
rebuild_user_balance() recomputes quantity_used from the ledger at any time. Admin endpoint: POST /credits/admin/rebuild/{user_id}.
Lazy Evaluation¶
Two things happen lazily inside get_user_balances(), before the balance is computed:
-
_materialize_ungranted_payments()— Finds payments withstatus=SUCCEEDED, entitlement_granted=False. Creates credit entitlements via atomic CAS (UPDATE ... WHERE entitlement_granted = false). The webhook is a thin fact-recorder; credits materialize on first balance read. -
_reset_expired_renewable_periods()— For RENEWABLE entitlements (beta credits), checks ifperiod_start_at + renewal_period_days < now. If expired, resetsquantity_used=0, writes a RESET transaction, clears stale reservations.
Both are self-healing: if the system is down when a webhook fires, the next balance read catches up.
Payment Flow¶
Stripe webhook fires
→ handle_checkout_session_completed()
→ Creates Payment record (status=SUCCEEDED, entitlement_granted=False)
→ Grants TIER_ACCESS eagerly (user needs dashboard access immediately)
→ Credits are NOT granted here (deferred)
→ Returns 200
User opens dashboard
→ GET /users/me/billing
→ get_user_balances()
→ _materialize_ungranted_payments() ← credits created here
→ _reset_expired_renewable_periods() ← RENEWABLE reset here
→ Returns balance with credits
Credit Metering¶
Analysis endpoints check credits before processing:
- Estimate —
ProjectService.estimate_analysis_credits()sums clip durations (ceiling to minutes), applies mode multiplier - Reserve —
CreditService.reserve_credits()stamps reservation metadata (includesreserved_periodfor RENEWABLE entitlements) - Consume — Worker calls
CreditService.use_credits()→consume_entitlement()drain loop writes oneEntitlementTransaction(USAGE)per entitlement drained, all withcommit=False, singledb.commit() - Release — On failure,
CreditService.release_credits()clears reservation metadata
Consumption Ordering¶
When credits are consumed, entitlements are drained in priority order:
- RENEWABLE first (use-it-or-lose-it — beta credits)
- DECREMENTAL by expiry (expiring soonest first, then non-expiring)
- ACCUMULATIVE last
This means beta credits are always consumed before subscription credits, and expiring promos are consumed before permanent purchases.
Analysis Modes¶
Mode is derived from settings (not explicitly selected). AnalysisMode enum in modules/analysis_run/enums.py, multipliers in CREDIT_MULTIPLIERS:
| Mode | Multiplier | Trigger |
|---|---|---|
ai_edit |
1.0 | Any cut/censorship/director feature enabled |
captions_only |
0.5 | Only language set (transcription + captions) |
manual |
0.0 | Nothing toggled (frontend doesn't call analyze) |
The analyze endpoint rejects a captions_only request that carries any ai_edit setting (non-zero pacing_level / false_start_sensitivity, non-empty director_notes, non-none censorship) with a 422 via ProjectService.validate_analysis_mode_consistency. This keeps analysis_mode meaningful for metrics and prevents the "0.5× credits for full pipeline" bypass a tampered client would otherwise get. The worker itself is not mode-gated beyond AI Director asset fetching, so the API-layer validation is the authoritative gate. Mode stored on AnalysisRun for audit.
Beta Entitlement¶
Beta users get creator-tier access + 240 renewable AI minutes/month. Managed via BetaService in entitlement/beta.py:
from modules.entitlement.beta import BetaService
beta = BetaService()
await beta.grant(user_id, db) # TIER_ACCESS + RENEWABLE CREDIT_GRANT
await beta.revoke(user_id, db) # Soft-deletes both by reference_id
await beta.has_access(user_id, db) # Checks active TIER_ACCESS with BETA reason
await beta.bulk_grant(user_ids, db) # One tier lookup, bulk duplicate check, single commit
await beta.list_users(db) # All beta users with credit usage (SQL join)
Admin invite flow: POST /admin/beta/invite takes an email. If the user exists, grants beta immediately and sends a welcome email. If not, creates a signed JWT token (24h expiry, purpose=beta_invite) and sends an invite email with a signup link.
The signup link delivers the token via ?beta_invite=TOKEN on the landing URL. Both signup paths consume it:
- Email/password signup (POST /users/?beta_invite=TOKEN) verifies the token and grants beta after account creation on email match.
- Google OAuth signup — the frontend forwards ?beta_invite=TOKEN to GET /auth/oauth/google?beta_invite=TOKEN; the backend persists it on OAuthState and the callback (/auth/oauth/callback/google) verifies + grants on match. Grant failures are non-fatal to OAuth login (logged, not raised).
Email matching is canonical, not byte-exact. The comparison goes through modules.common.utils.email.canonical_email() on both sides: lowercase entire address, strip +tag aliasing for any domain, strip dots in the local part for @gmail.com and @googlemail.com (other providers treat dots as distinct mailboxes — do not generalize). Without this, an invite to User.Name@gmail.com silently fails to grant beta to username@gmail.com even though Google routes both to the same mailbox.
Beta entitlements stack with subscriptions. resolve_tier_context() picks the highest-ranked tier. Credits from both pools sum. Beta credits are consumed first (RENEWABLE before DECREMENTAL).
Thumbnails¶
Asset and project thumbnails are generated via FFmpeg (modules/asset/thumbnail.py) and stored in R2 as thumbnail_key. Presigned thumbnail_url is served inline in list responses (not separate endpoints) — generate_download_url() is a local HMAC operation (~1ms, no network call).
- Assets: Generated on
confirm_upload()for video/image types. Center-cropped to 240x240 JPEG. - Projects: Generated from first clip in
process_clip_artifacts()worker. Only if project has no thumbnail yet. - Enrichment:
_enrich_with_thumbnail_urls()in AssetService (instance method),_enrich_projects_with_thumbnail_urls()module-level in ProjectService. Called in all list/get/confirm return paths.
Vertical Crop¶
Optional crop for non-native aspect ratio exports. CropRegion schema (draft/schemas.py) stores normalized 0-1 coordinates. build_crop_filter() (workers/render/ffmpeg/video_filters.py) converts to FFmpeg crop=w:h:x:y filter with even-dimension rounding for H.264 chroma alignment.
Crop is injected in all 3 render paths (trim-only, asset composite, B-roll composite) after setpts and before scale+pad. For B-roll two-pass rendering, crop is applied in Pass 1 (trim) to avoid double-cropping. Source dimensions probed via get_video_dimensions() at render time.
Tier Enforcement¶
Tier-specific limits are centralized in UserTierContext — a frozen dataclass resolved once per request via resolve_tier_context(). Properties: max_projects, can_use_ai_director, can_watermark_free, can_access_support, storage_quota_mb, export_retention_days.
from modules.entitlement.tier_context import resolve_tier_context
# In API routes (via DI dependency) — the ONLY place resolve_tier_context
# is called. Snapshot the decision onto the queued record if a worker needs it.
tier_ctx = await resolve_tier_context(user_id, db)
if tier_ctx.max_projects <= project_count:
raise ValidationError("Project limit reached")
# For worker-bound work: snapshot at request time (Convention #12).
export = await crud_exports.create(
db=db,
object=ExportCreateInternal(
...,
watermark_required=not tier_ctx.can_watermark_free, # snapshot
),
)
# In workers: read from the snapshot on the queued record, NEVER
# re-resolve tier_ctx. The user's tier may have changed between
# kickoff and execution; the snapshot preserves intent.
if export["watermark_required"]:
apply_watermark(...)
See Convention #12 for the full rationale + inline # noqa: tier-in-worker exception mechanism for the rare observational case.
Resolution Logic¶
resolve_tier_context() picks the highest-ranked tier across all active tier entitlements (not the first one). Key behaviors:
tier_name: From the highest-ranked tier (free=0, hobby=1, creator=2, viral=3)is_paid: True if ANY entitlement has a paid grant_reason (subscription,purchase,enterprise_contract) — across all entitlements, not just the winnergrant_reasons: Frozenset of every active entitlement's grant reason. Callers check set membership (e.g.{"beta"} & tier_ctx.grant_reasons) rather than a scalar winner — the previous scalar field was unreliable onTIER_RANKties (e.g. TRIAL+BETA both on creator), wheremax()could pick TRIAL and hide the beta entitlementcan_watermark_free: True ifis_paidand tier inWATERMARK_FREE_TIERS, OR ifgrant_reasonsintersectsWATERMARK_FREE_GRANT_REASONS(includesbeta)
This means a beta+hobby user gets creator features (beta wins on tier rank), is_paid=True (from hobby), and watermark-free (beta in grant_reasons).
Constants in modules/entitlement/constants.py: TIER_MAX_PROJECTS, TIER_STORAGE_MB, TIER_EXPORT_RETENTION_DAYS, AI_DIRECTOR_TIERS, TIER_RANK, PAID_GRANT_REASONS, WATERMARK_FREE_GRANT_REASONS.
Unit conversions use BYTES_PER_MB from modules/common/constants.py (e.g., quota_bytes = tier_ctx.storage_quota_mb * BYTES_PER_MB).
Priority Queue¶
Task brokers use RabbitMQ with native priority queues. User-facing tasks (analysis, render, asset edit) are enqueued with a priority derived from the user's subscription tier. Non-user-facing tasks (email, download) use plain FIFO.
Priority Mapping¶
| Tier | is_paid |
Priority | Constant |
|---|---|---|---|
| viral | true | 3 | TaskPriority.HIGH |
| creator | true | 2 | TaskPriority.NORMAL |
| hobby | true | 1 | TaskPriority.LOW |
| trial / free / none | false | 0 | TaskPriority.BACKGROUND |
Enqueue Pattern¶
from infrastructure.taskiq.priority import resolve_task_priority
from modules.entitlement.tier_context import resolve_tier_context
tier_ctx = await resolve_tier_context(user_id, db)
priority = resolve_task_priority(tier_ctx.tier_name, tier_ctx.is_paid)
await analyze_project.kicker().with_labels(priority=int(priority)).kiq(
project_uuid=str(project_uuid),
)
Priority is resolved at enqueue time — if a user upgrades, their next task gets the new priority. Tasks already in the queue keep their original priority.
Queue Configuration¶
Priority queues use QueueType.CLASSIC with max_priority=3 and qos=2 (low prefetch for strict ordering). Brokers and constants are in infrastructure/taskiq/brokers.py and infrastructure/taskiq/priority.py.
Feature Flags¶
Postgres-backed feature flags for kill switches, gradual rollouts, beta testing, and time-limited features. No external dependencies.
Models¶
FeatureFlag — the flag itself (key, enabled, rollout_percentage, description, expires_at). FeatureFlagUser — explicit allowlist (user always gets the flag).
Evaluation Order¶
enabled=False→ off (kill switch — overrides everything)- User in allowlist → on (bypasses expiry and percentage)
expires_atin the past → offrollout_percentageset → deterministic hash checkrollout_percentagenull → on (everyone)
Route Gating¶
Use require_feature() dependency to gate endpoints behind flags. Returns 404 (not 403) to hide existence:
from modules.feature_flag.dependencies import require_feature
@router.post("/new-endpoint")
async def new_endpoint(
...,
_: Annotated[None, Depends(require_feature("my_flag"))],
):
# Only reachable when flag is on for this user
Frontend¶
useFeatureFlag("key") returns a boolean backed by GET /api/v1/flags (cached 5 minutes). The backend is the authority — even if someone inspects the response, the route dependency blocks access.
Admin Endpoints¶
GET/POST/PATCH/DELETE /api/v1/admin/flags + allowlist management via /admin/flags/{id}/users. Superuser only.
Scripts¶
| Script | Purpose | How to run |
|---|---|---|
scripts/setup_initial_data.py |
Create tables + seed tiers | Auto-runs on docker compose up |
scripts/seed_all.py |
Tables + tiers + Stripe products (full setup) | Auto-runs on docker compose up |
scripts/seed_stripe_products.py |
Stripe products + prices only | Called by seed_all.py if STRIPE_SECRET_KEY is set |
scripts/seed_trial_credits.py |
Grant trial credits to a user | uv run python scripts/seed_trial_credits.py --email user@example.com |
Key Files¶
| Purpose | Location |
|---|---|
| Database session | infrastructure/database/session.py |
| Model mixins | infrastructure/database/models.py |
| Auth dependencies | infrastructure/auth/session/dependencies.py |
| Auth constants | infrastructure/auth/constants.py |
| Common schemas | modules/common/schemas.py |
| Domain exceptions | modules/common/exceptions.py |
| Error handler | modules/common/utils/error_handler.py |
| API dependencies | interfaces/api/dependencies.py |
| Route registration | interfaces/api/v1/__init__.py |
| Credit service + ledger | modules/credits/service.py |
| Entitlement transaction model | modules/credits/models.py |
| Trial granting | modules/entitlement/trial.py |
| Beta service | modules/entitlement/beta.py |
| Beta admin API | interfaces/api/v1/beta.py |
| Tier context resolution | modules/entitlement/tier_context.py |
| Entitlement constants | modules/entitlement/constants.py |
| Broker setup | infrastructure/taskiq/brokers.py |
| Priority routing | infrastructure/taskiq/priority.py |
| Feature flags | modules/feature_flag/service.py |
| Feature flag dependency | modules/feature_flag/dependencies.py |