Skip to main content
Real-time video models have reached remarkable speeds. Some generate at 30+ FPS, producing frames faster than humans can perceive. Yet many researchers still follow the “sync” approach: insert all inputs upfront, run the model, wait for the complete video at the end. Even if the model runs in real time internally, you still wait. Reactor changes this. Instead of waiting for a complete video, your model streams frames as they are generated. Users see results immediately and can interact with the generation in real time. This section covers how to write (or convert) a pipeline for continuous streaming generation.

Blocks: The Unit of Generation

Real-time models generate video in blocks: small batches of temporally adjacent frames. A block might be 1, 4, 8, or 12 frames depending on your model’s architecture.
EDITOR: Add diagram showing blocks as small frame batches flowing through the pipeline, contrasting with traditional “generate all frames, then output” approach.
Traditional pipelines accumulate all blocks, then write the complete video to disk:
# Traditional approach: wait for everything
def generate_video(self, prompt, num_frames=120):
    all_frames = []
    for block_idx in range(num_frames // block_size):
        frames = self.forward(prompt)
        all_frames.append(frames)
    
    # User waits until here to see anything
    save_video(all_frames)
In Reactor, you emit each block as soon as it is ready:
# Reactor approach: stream immediately
def start_session(self):
    while not get_ctx().should_stop():
        frames = self.forward(self.current_prompt)
        get_ctx().emit_block(frames)  # User sees frames immediately
The user sees frames within milliseconds of generation. No waiting.

Real-Time Conditioning

Because generation is continuous, conditions can change mid-stream. On each forward step, your pipeline reads the current state of its input variables, which may have been updated by user commands running in parallel.
from reactor_runtime import VideoModel, command, get_ctx

class MyVideoModel(VideoModel):
    def __init__(self, config):
        self.current_prompt = "a calm forest"
        self.pipeline = load_pipeline()

    @command("set_prompt")
    def set_prompt(self, prompt: str):
        # Called from command thread when user sends new prompt
        self.current_prompt = prompt

    def start_session(self):
        while not get_ctx().should_stop():
            # Read the current prompt (may have changed since last iteration)
            prompt_for_this_frame = self.current_prompt
            
            frames = self.pipeline.forward(condition=prompt_for_this_frame)
            get_ctx().emit_block(frames)
EDITOR: Add diagram showing the two parallel threads: the generation loop reading from instance variables, and the command thread writing to them.
The @command decorator sets self.current_prompt from the command thread. The generation loop reads it each iteration. The user gets prompt switching in real time, frame by frame.

Pipeline Architecture Patterns

Pattern 1: Pipeline as a Separate Class

If you have an existing pipeline class, give it setters that your VideoModel commands can call. The pipeline itself runs the generation loop and emits frames directly. Note that get_ctx() is a global function importable from reactor_runtime anywhere in your code.
from reactor_runtime import get_ctx

class MyPipeline:
    def __init__(self):
        self.mouse_action = [0, 0]
    
    def set_mouse(self, x: float, y: float):
        self.mouse_action = [x, y]
    
    def inference(self):
        while not get_ctx().should_stop():
            # Read current action from class memory
            curr_action = self.mouse_action
            frames = self.generate_frames(cond=curr_action)
            get_ctx().emit_block(frames)

class MyVideoModel(VideoModel):
    def __init__(self, config):
        self.pipeline = MyPipeline()

    @command("mouse")
    def mouse(self, x: float, y: float):
        self.pipeline.set_mouse(x, y)

    def start_session(self):
        self.pipeline.inference()
Avoid “drilling” inputs. Do not pass every user input through the inference() signature like inference(mouse_x, mouse_y, keyboard, speed, prompt, ...). This becomes unwieldy as inputs grow. Instead, use setters so the pipeline reads its own instance variables directly.

Pattern 2: Pipeline in the VideoModel

If you are writing a model native to Reactor, write the generation logic directly in your VideoModel class. This is cleaner when building from scratch:
class MyVideoModel(VideoModel):
    def __init__(self, config):
        self.mouse = [0, 0]

    @command("mouse")
    def set_mouse(self, x: float, y: float):
        self.mouse = [x, y]

    def start_session(self):
        while not get_ctx().should_stop():
            curr_action = self.mouse
            frames = self._generate_block(cond=curr_action)
            get_ctx().emit_block(frames)

Making Finite Pipelines Loop

Some models are designed to generate a fixed number of frames (e.g., 120 frames for a 4-second clip). In Reactor, sessions should run indefinitely until explicitly stopped. If your model has a natural endpoint, wrap it instead of returning from start_session().
def start_session(self):
    while not get_ctx().should_stop():
        # Reset state for a new generation cycle
        self._reset_generation_state()
        
        # Generate your fixed-length sequence
        for block_idx in range(self.num_blocks):
            if get_ctx().should_stop():
                return
            
            frames = self.forward_block(block_idx)
            get_ctx().emit_block(frames)
        
        # Loop wraps back to start instead of returning
The outer while loop makes the pipeline wrap: when it finishes generating, it resets and starts again. The session continues until the stop signal is received.

Cooperative Stopping and Error Handling

The session stays active until start_session() returns. Your pipeline should:
  1. Check get_ctx().should_stop() regularly and exit gracefully when true
  2. Raise exceptions properly so errors can be reported (especially important in deployment)
  3. Clean up session state before returning
def start_session(self):
    try:
        while not get_ctx().should_stop():
            frames = self.generate()
            get_ctx().emit_block(frames)
    except Exception as e:
        # Log and re-raise so Reactor can report the error
        logger.error(f"Generation failed: {e}")
        raise
For detailed patterns on cleanup, state reset, and error handling, see Model Cleanup.
Now that you understand how to structure your pipeline for real-time generation, the next step is learning how frame emission works and what formats Reactor accepts.

Emitting Frames

Learn how to send generated frames to the client smoothly.