SSE Integration¶
Sapari uses Server-Sent Events (SSE) for real-time updates from the backend. This page covers how we connect to the SSE endpoint and handle events.
How SSE Works¶
SSE is a one-way connection from server to client. The client opens a connection, and the server pushes events as they happen:
Unlike WebSockets, SSE is: - Unidirectional (server to client only) - Built on HTTP (works through proxies) - Reconnectable on disconnect (our client handles this explicitly, see Connection Lifecycle) - Simpler to implement
Event Types¶
The backend publishes these events to project:{uuid}:events:
Clip Events¶
| Event | When | Payload |
|---|---|---|
clip_processing |
Processing started | clip_uuid, clip_name |
clip_ready |
Processing complete | clip_uuid, has_proxy, has_audio, has_waveform |
clip_failed |
Processing failed | clip_uuid, error |
Analysis Events¶
| Event | When | Payload |
|---|---|---|
analysis_started |
Pipeline began | pacing_level, language |
analysis_progress |
Step completed | step, progress (0-100) |
analysis_complete |
All edits created | edit_count, word_count |
analysis_failed |
Pipeline failed | error |
Export Events¶
| Event | When | Payload |
|---|---|---|
export_started |
Render began | export_uuid, export_name |
export_progress |
Step completed | progress_percent, current_step |
export_complete |
Ready for download | export_uuid, duration_ms, size_bytes |
export_failed |
Render failed | export_uuid, error |
SSE Client¶
The subscribeToProject function creates an EventSource connection:
import { subscribeToProject } from '../shared/lib/events';
const unsubscribe = subscribeToProject('project-uuid', {
onClipReady: (event) => {
console.log(`${event.clip_name} is ready!`);
},
onAnalysisComplete: (event) => {
console.log(`Created ${event.edit_count} edits`);
},
onError: (error) => {
console.error('Connection error:', error);
},
});
// Later, to cleanup:
unsubscribe();
The client uses typed handlers so you get autocomplete and type checking for event payloads.
useProjectEvents Hook¶
The useProjectEvents hook wraps the SSE client with React lifecycle management and automatic cache invalidation:
function ProjectEditor({ projectUuid }: { projectUuid: string }) {
// Subscribe to events and auto-invalidate queries
useProjectEvents(projectUuid, {
handlers: {
onAnalysisProgress: (event) => {
setProgress(event.progress);
},
onAnalysisComplete: (event) => {
toast.success(`Analysis complete! ${event.edit_count} edits created.`);
},
onAnalysisFailed: (event) => {
toast.error(`Analysis failed: ${event.error}`);
},
},
});
// These queries auto-refetch when relevant events arrive
const { data: clips } = useClips(projectUuid);
const { data: edits } = useEdits(projectUuid);
// ...
}
Auto-Invalidation¶
The hook automatically invalidates React Query caches:
| Event | Invalidated Query |
|---|---|
clip_ready |
clipKeys.byProject(uuid) |
clip_failed |
clipKeys.byProject(uuid) |
analysis_complete |
editKeys.byProject(uuid) |
export_complete |
exportKeys.byProject(uuid) |
export_failed |
exportKeys.byProject(uuid) |
You can disable this with autoInvalidate: false if you want manual control.
Options¶
useProjectEvents(projectUuid, {
// Auto-invalidate queries on events (default: true)
autoInvalidate: true,
// Custom event handlers
handlers: {
onClipReady: (event) => { ... },
onAnalysisComplete: (event) => { ... },
// ...
},
// Conditionally enable/disable subscription
enabled: isProjectOpen,
});
Promise-Based Waiting¶
For imperative code, use waitForEvent to wait for a specific event:
import { waitForEvent } from '../shared/lib/events';
async function analyzeAndWait(projectUuid: string) {
// Trigger analysis
await projectApi.analyze(projectUuid, { pacing_level: 50 });
// Wait for completion (5 minute timeout)
const result = await waitForEvent(projectUuid, 'analysis_complete');
console.log(`Created ${result.edit_count} edits`);
}
This is useful for tests or scripts where you need to wait for a background operation.
Connection Lifecycle¶
The SSE connection:
- Reconnects on disconnect with explicit exponential backoff (see below)
- Sends cookies for authentication (withCredentials: true)
- Closes on component unmount (via cleanup function)
Handling Reconnection¶
Browsers don't reliably auto-reconnect once EventSource reaches the CLOSED state (server restart, network hiccup, proxy timeout), so subscribeToProject reconnects manually. When onerror fires and the underlying EventSource is closed, we schedule a new EventSource via setTimeout with exponential backoff: 1s, 2s, 4s, 8s, 16s, capped at 30s. The attempt counter resets to 0 on the next successful onopen, so a long-running analysis or proxy re-encode that outlasts a single TCP connection still gets its eventual clip_ready / analysis_complete event. See frontend/shared/lib/events.ts for the implementation.
The onOpen handler fires on each successful connection, including after reconnects:
useProjectEvents(projectUuid, {
handlers: {
onOpen: () => {
// Refetch data in case we missed events while disconnected
queryClient.invalidateQueries({ queryKey: projectKeys.detail(projectUuid) });
},
},
});
Error Handling¶
Connection errors fire the onError handler. Common causes:
- Network issues
- Server restart
- Authentication expired
handlers: {
onError: (error) => {
if (error.type === 'error') {
// Check if it's an auth issue
checkAuthStatus();
}
},
}
Backend Implementation¶
On the backend, the SSE endpoint is an async generator that subscribes to Redis pub/sub:
@router.get("/{project_uuid}/events")
async def stream_project_events(project_uuid: UUID):
async def event_generator():
async for event in subscribe_to_project(project_uuid):
if await request.is_disconnected():
break
yield event.to_sse()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"},
)
Workers publish events to Redis, and the SSE endpoint forwards them to connected clients.
When to Use SSE vs Polling¶
Not everything needs SSE. For simpler background processes, we use React Query's refetchInterval:
| Pattern | Use Case | Examples |
|---|---|---|
| SSE | Real-time critical, user actively watching | Clip processing, Analysis pipeline |
| Polling | Background process, user checks back later | Exports, Asset downloads |
Polling Pattern¶
// hooks/useAssets.ts
export function useAssets(groupId?: string) {
return useQuery({
queryKey: assetKeys.list(groupId),
queryFn: () => assetApi.list(groupId),
// Poll every 3s while assets are downloading
refetchInterval: (query) => {
const assets = query.state.data?.data;
if (!assets) return false;
const hasPending = assets.some((a) => a.status === 'pending');
return hasPending ? 3000 : false;
},
});
}
Why SSE for Some, Polling for Others?¶
SSE is better when: - User is actively watching progress (analysis steps) - Updates are frequent (every few seconds) - Resource is project-scoped (we already have a project channel)
Polling is simpler when: - User imports something and continues working - Updates are infrequent (download completes once) - Resource is user-scoped (assets) - would need new channel infrastructure
Key Files¶
| Component | Location |
|---|---|
| SSE client | frontend/shared/lib/events.ts |
| useProjectEvents hook | frontend/features/projects/hooks.ts |
| Backend SSE endpoint | backend/src/interfaces/api/v1/projects.py |
| Event publisher | backend/src/infrastructure/events/publisher.py |
| Event schemas | backend/src/infrastructure/events/schemas.py |