Skip to content

SSE Integration

Sapari uses Server-Sent Events (SSE) for real-time updates from the backend. This page covers how we connect to the SSE endpoint and handle events.

How SSE Works

SSE is a one-way connection from server to client. The client opens a connection, and the server pushes events as they happen:

Client opens connection → Server holds it open → Server pushes events → Client receives them

Unlike WebSockets, SSE is: - Unidirectional (server to client only) - Built on HTTP (works through proxies) - Reconnectable on disconnect (our client handles this explicitly, see Connection Lifecycle) - Simpler to implement

Event Types

The backend publishes these events to project:{uuid}:events:

Clip Events

Event When Payload
clip_processing Processing started clip_uuid, clip_name
clip_ready Processing complete clip_uuid, has_proxy, has_audio, has_waveform
clip_failed Processing failed clip_uuid, error

Analysis Events

Event When Payload
analysis_started Pipeline began pacing_level, language
analysis_progress Step completed step, progress (0-100)
analysis_complete All edits created edit_count, word_count
analysis_failed Pipeline failed error

Export Events

Event When Payload
export_started Render began export_uuid, export_name
export_progress Step completed progress_percent, current_step
export_complete Ready for download export_uuid, duration_ms, size_bytes
export_failed Render failed export_uuid, error

SSE Client

The subscribeToProject function creates an EventSource connection:

import { subscribeToProject } from '../shared/lib/events';

const unsubscribe = subscribeToProject('project-uuid', {
  onClipReady: (event) => {
    console.log(`${event.clip_name} is ready!`);
  },
  onAnalysisComplete: (event) => {
    console.log(`Created ${event.edit_count} edits`);
  },
  onError: (error) => {
    console.error('Connection error:', error);
  },
});

// Later, to cleanup:
unsubscribe();

The client uses typed handlers so you get autocomplete and type checking for event payloads.

useProjectEvents Hook

The useProjectEvents hook wraps the SSE client with React lifecycle management and automatic cache invalidation:

function ProjectEditor({ projectUuid }: { projectUuid: string }) {
  // Subscribe to events and auto-invalidate queries
  useProjectEvents(projectUuid, {
    handlers: {
      onAnalysisProgress: (event) => {
        setProgress(event.progress);
      },
      onAnalysisComplete: (event) => {
        toast.success(`Analysis complete! ${event.edit_count} edits created.`);
      },
      onAnalysisFailed: (event) => {
        toast.error(`Analysis failed: ${event.error}`);
      },
    },
  });

  // These queries auto-refetch when relevant events arrive
  const { data: clips } = useClips(projectUuid);
  const { data: edits } = useEdits(projectUuid);

  // ...
}

Auto-Invalidation

The hook automatically invalidates React Query caches:

Event Invalidated Query
clip_ready clipKeys.byProject(uuid)
clip_failed clipKeys.byProject(uuid)
analysis_complete editKeys.byProject(uuid)
export_complete exportKeys.byProject(uuid)
export_failed exportKeys.byProject(uuid)

You can disable this with autoInvalidate: false if you want manual control.

Options

useProjectEvents(projectUuid, {
  // Auto-invalidate queries on events (default: true)
  autoInvalidate: true,

  // Custom event handlers
  handlers: {
    onClipReady: (event) => { ... },
    onAnalysisComplete: (event) => { ... },
    // ...
  },

  // Conditionally enable/disable subscription
  enabled: isProjectOpen,
});

Promise-Based Waiting

For imperative code, use waitForEvent to wait for a specific event:

import { waitForEvent } from '../shared/lib/events';

async function analyzeAndWait(projectUuid: string) {
  // Trigger analysis
  await projectApi.analyze(projectUuid, { pacing_level: 50 });

  // Wait for completion (5 minute timeout)
  const result = await waitForEvent(projectUuid, 'analysis_complete');

  console.log(`Created ${result.edit_count} edits`);
}

This is useful for tests or scripts where you need to wait for a background operation.

Connection Lifecycle

The SSE connection: - Reconnects on disconnect with explicit exponential backoff (see below) - Sends cookies for authentication (withCredentials: true) - Closes on component unmount (via cleanup function)

Handling Reconnection

Browsers don't reliably auto-reconnect once EventSource reaches the CLOSED state (server restart, network hiccup, proxy timeout), so subscribeToProject reconnects manually. When onerror fires and the underlying EventSource is closed, we schedule a new EventSource via setTimeout with exponential backoff: 1s, 2s, 4s, 8s, 16s, capped at 30s. The attempt counter resets to 0 on the next successful onopen, so a long-running analysis or proxy re-encode that outlasts a single TCP connection still gets its eventual clip_ready / analysis_complete event. See frontend/shared/lib/events.ts for the implementation.

The onOpen handler fires on each successful connection, including after reconnects:

useProjectEvents(projectUuid, {
  handlers: {
    onOpen: () => {
      // Refetch data in case we missed events while disconnected
      queryClient.invalidateQueries({ queryKey: projectKeys.detail(projectUuid) });
    },
  },
});

Error Handling

Connection errors fire the onError handler. Common causes: - Network issues - Server restart - Authentication expired

handlers: {
  onError: (error) => {
    if (error.type === 'error') {
      // Check if it's an auth issue
      checkAuthStatus();
    }
  },
}

Backend Implementation

On the backend, the SSE endpoint is an async generator that subscribes to Redis pub/sub:

@router.get("/{project_uuid}/events")
async def stream_project_events(project_uuid: UUID):
    async def event_generator():
        async for event in subscribe_to_project(project_uuid):
            if await request.is_disconnected():
                break
            yield event.to_sse()

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache"},
    )

Workers publish events to Redis, and the SSE endpoint forwards them to connected clients.

When to Use SSE vs Polling

Not everything needs SSE. For simpler background processes, we use React Query's refetchInterval:

Pattern Use Case Examples
SSE Real-time critical, user actively watching Clip processing, Analysis pipeline
Polling Background process, user checks back later Exports, Asset downloads

Polling Pattern

// hooks/useAssets.ts
export function useAssets(groupId?: string) {
  return useQuery({
    queryKey: assetKeys.list(groupId),
    queryFn: () => assetApi.list(groupId),
    // Poll every 3s while assets are downloading
    refetchInterval: (query) => {
      const assets = query.state.data?.data;
      if (!assets) return false;
      const hasPending = assets.some((a) => a.status === 'pending');
      return hasPending ? 3000 : false;
    },
  });
}

Why SSE for Some, Polling for Others?

SSE is better when: - User is actively watching progress (analysis steps) - Updates are frequent (every few seconds) - Resource is project-scoped (we already have a project channel)

Polling is simpler when: - User imports something and continues working - Updates are infrequent (download completes once) - Resource is user-scoped (assets) - would need new channel infrastructure

Key Files

Component Location
SSE client frontend/shared/lib/events.ts

| useProjectEvents hook | frontend/features/projects/hooks.ts | | Backend SSE endpoint | backend/src/interfaces/api/v1/projects.py | | Event publisher | backend/src/infrastructure/events/publisher.py | | Event schemas | backend/src/infrastructure/events/schemas.py |


← State Management Deployment →