Events (SSE)¶
Workers publish events to Redis pub/sub. The API streams them to clients via Server-Sent Events. This gives real-time updates without polling.
How It Works¶
sequenceDiagram
participant Worker
participant Redis
participant SSE as SSE Endpoint
participant Browser as EventSource
Worker->>Redis: PUBLISH project:{uuid}:events
Redis->>SSE: Message received
SSE->>Browser: event: ANALYSIS_COMPLETE<br/>data: {...}
Browser->>Browser: Invalidate React Query cache
Each project has its own channel: project:{uuid}:events. When a worker finishes a step, it publishes an event. The SSE endpoint subscribes to that channel and forwards events to connected clients.
Event Flow Architecture¶
flowchart TB
subgraph Workers["Background Workers"]
DW[Download Worker]
PW[Proxy Worker]
AW[Analysis Worker]
RW[Render Worker]
end
subgraph Redis["Redis"]
CH1[project:abc:events]
CH2[project:xyz:events]
end
subgraph API["FastAPI"]
SSE1[SSE /abc/events]
SSE2[SSE /xyz/events]
end
subgraph Clients["Browser Clients"]
C1[User A viewing project abc]
C2[User B viewing project xyz]
end
DW -->|Publish| CH1
PW -->|Publish| CH1
AW -->|Publish| CH1
RW -->|Publish| CH2
CH1 -->|Subscribe| SSE1
CH2 -->|Subscribe| SSE2
SSE1 -->|Stream| C1
SSE2 -->|Stream| C2
The publisher is a singleton that lazily connects to Redis:
from src.infrastructure.events import get_event_publisher
publisher = get_event_publisher()
await publisher.publish(
project_uuid=project_uuid,
event=AnalysisCompleteEvent(edit_count=42),
)
Event Types¶
Events are defined in backend/src/infrastructure/events/schemas.py. Each has a type string and payload:
flowchart LR
subgraph Clip["Clip Events"]
CP[CLIP_PROCESSING]
CR[CLIP_READY]
CF[CLIP_FAILED]
end
subgraph Analysis["Analysis Events"]
AS[ANALYSIS_STARTED]
AP[ANALYSIS_PROGRESS]
AC[ANALYSIS_COMPLETE]
AF[ANALYSIS_FAILED]
end
subgraph Export["Export Events"]
ES[EXPORT_STARTED]
EP[EXPORT_PROGRESS]
EC[EXPORT_COMPLETE]
EF[EXPORT_FAILED]
end
Clip Events¶
| Event | When | Payload |
|---|---|---|
CLIP_PROCESSING |
Download started | clip_uuid |
CLIP_READY |
Processing complete | clip_uuid, duration_ms |
CLIP_FAILED |
Processing failed | clip_uuid, error |
Analysis Events¶
| Event | When | Payload |
|---|---|---|
ANALYSIS_STARTED |
Pipeline begins | project_uuid |
ANALYSIS_PROGRESS |
Step completed | step, progress |
ANALYSIS_COMPLETE |
All edits created | edit_count |
ANALYSIS_FAILED |
Pipeline failed | error |
Export Events¶
| Event | When | Payload |
|---|---|---|
EXPORT_STARTED |
Render begins | export_uuid |
EXPORT_PROGRESS |
Step completed | step, progress |
EXPORT_COMPLETE |
Ready to download | export_uuid, storage_key |
EXPORT_FAILED |
Render failed | export_uuid, error |
SSE Connection Lifecycle¶
stateDiagram-v2
[*] --> Connecting: new EventSource()
Connecting --> Connected: onopen
Connecting --> Error: onerror
Connected --> Receiving: event received
Receiving --> Connected: process event
Connected --> Reconnecting: connection lost
Reconnecting --> Connected: auto-reconnect
Error --> Reconnecting: retry
Connected --> [*]: close()
SSE Endpoint¶
The endpoint is at GET /api/v1/projects/{project_uuid}/events. It returns a StreamingResponse with text/event-stream content type.
@router.get("/{project_uuid}/events")
async def stream_project_events(
project_uuid: UUID,
current_user: dict = Depends(get_current_user),
):
# Verify user owns project
project = await project_service.get(...)
if project["user_id"] != current_user["id"]:
raise HTTPException(403)
return StreamingResponse(
event_subscriber.subscribe(str(project_uuid)),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
The subscriber is an async generator that yields SSE-formatted strings:
It also sends keepalive comments every 15 seconds to prevent connection timeouts.
Frontend Integration¶
flowchart TB
subgraph Hook["useProjectEvents Hook"]
ES[EventSource] --> H[Event Handlers]
H --> I[Invalidate Queries]
end
subgraph Queries["React Query"]
CQ[useClips]
EQ[useEdits]
XQ[useExports]
end
I -->|clip_ready| CQ
I -->|analysis_complete| EQ
I -->|export_complete| XQ
CQ --> UI[UI Re-render]
EQ --> UI
XQ --> UI
The frontend uses EventSource to connect:
const eventSource = new EventSource(
`/api/v1/projects/${projectUuid}/events`,
{ withCredentials: true }
);
eventSource.addEventListener('CLIP_READY', (e) => {
const data = JSON.parse(e.data);
queryClient.invalidateQueries(['clips', projectUuid]);
});
eventSource.addEventListener('ANALYSIS_COMPLETE', (e) => {
const data = JSON.parse(e.data);
queryClient.invalidateQueries(['edits', projectUuid]);
});
When events arrive, we invalidate React Query caches so the UI refetches fresh data. This keeps the UI in sync without manual polling.
Polling Fallback¶
Not everything uses SSE. For simpler cases, we use React Query's refetchInterval to poll while a resource is pending:
flowchart LR
subgraph Polling["Polling Pattern"]
Q[useQuery] -->|refetchInterval| API[GET /resource]
API -->|status: pending| Q
API -->|status: complete| STOP[Stop polling]
end
When to Use Each¶
| Pattern | Use Case | Examples |
|---|---|---|
| SSE | Real-time critical, user is actively watching | Clip processing, Analysis pipeline |
| Polling | Background process, simpler implementation | Exports, Asset downloads |
Polling Implementation¶
// hooks/useAssets.ts
export function useAssets(groupId?: string) {
return useQuery({
queryKey: assetKeys.list(groupId),
queryFn: () => assetApi.list(groupId),
// Poll every 3s while assets are downloading
refetchInterval: (query) => {
const assets = query.state.data?.data;
if (!assets) return false;
const hasPending = assets.some((a) => a.status === 'pending');
return hasPending ? 3000 : false;
},
});
}
Current Usage¶
| Feature | Pattern | Why |
|---|---|---|
| Clips | SSE | User watches upload progress in real-time |
| Analysis | SSE | User watches analysis steps in real-time |
| Exports | Polling | Background render, user checks back later |
| Assets | Polling | YouTube downloads, user imports and continues working |
Key Files¶
| Component | Location |
|---|---|
| Event schemas | backend/src/infrastructure/events/schemas.py |
| Publisher | backend/src/infrastructure/events/publisher.py |
| Subscriber | backend/src/infrastructure/events/subscriber.py |
| SSE endpoint | backend/src/interfaces/api/v1/projects.py |
| Frontend SSE client | frontend/shared/lib/events.ts |
| Frontend hook | frontend/features/projects/hooks.ts |