How to structure your model for continuous real-time generation
Real-time video models have reached remarkable speeds. Some generate at 30+ FPS, producing frames
faster than humans can perceive. Yet many researchers still follow the “sync” approach: insert all
inputs upfront, run the model, wait for the complete video at the end. Even if the model runs in
real time internally, you still wait.Reactor changes this. Instead of waiting for a complete video, your model streams frames as they
are generated. Users see results immediately and can interact with the generation in real time.This section covers how to write (or convert) a pipeline for continuous streaming generation.
Real-time models generate video in blocks: small batches of temporally adjacent frames. A block
might be 1, 4, 8, or 12 frames depending on your model’s architecture.
EDITOR: Add diagram showing blocks as small frame batches flowing through the pipeline,
contrasting with traditional “generate all frames, then output” approach.
Traditional pipelines accumulate all blocks, then write the complete video to disk:
Copy
# Traditional approach: wait for everythingdef generate_video(self, prompt, num_frames=120): all_frames = [] for block_idx in range(num_frames // block_size): frames = self.forward(prompt) all_frames.append(frames) # User waits until here to see anything save_video(all_frames)
In Reactor, you emit each block as soon as it is ready:
Copy
# Reactor approach: stream immediatelydef start_session(self): while not get_ctx().should_stop(): frames = self.forward(self.current_prompt) get_ctx().emit_block(frames) # User sees frames immediately
The user sees frames within milliseconds of generation. No waiting.
Because generation is continuous, conditions can change mid-stream. On each forward step, your
pipeline reads the current state of its input variables, which may have been updated by user
commands running in parallel.
Copy
from reactor_runtime import VideoModel, command, get_ctxclass MyVideoModel(VideoModel): def __init__(self, config): self.current_prompt = "a calm forest" self.pipeline = load_pipeline() @command("set_prompt") def set_prompt(self, prompt: str): # Called from command thread when user sends new prompt self.current_prompt = prompt def start_session(self): while not get_ctx().should_stop(): # Read the current prompt (may have changed since last iteration) prompt_for_this_frame = self.current_prompt frames = self.pipeline.forward(condition=prompt_for_this_frame) get_ctx().emit_block(frames)
EDITOR: Add diagram showing the two parallel threads: the generation loop reading from
instance variables, and the command thread writing to them.
The @command decorator sets self.current_prompt from the command thread. The generation loop
reads it each iteration. The user gets prompt switching in real time, frame by frame.
If you have an existing pipeline class, give it setters that your VideoModel commands can call.
The pipeline itself runs the generation loop and emits frames directly. Note that get_ctx() is a
global function importable from reactor_runtime anywhere in your code.
Copy
from reactor_runtime import get_ctxclass MyPipeline: def __init__(self): self.mouse_action = [0, 0] def set_mouse(self, x: float, y: float): self.mouse_action = [x, y] def inference(self): while not get_ctx().should_stop(): # Read current action from class memory curr_action = self.mouse_action frames = self.generate_frames(cond=curr_action) get_ctx().emit_block(frames)class MyVideoModel(VideoModel): def __init__(self, config): self.pipeline = MyPipeline() @command("mouse") def mouse(self, x: float, y: float): self.pipeline.set_mouse(x, y) def start_session(self): self.pipeline.inference()
Avoid “drilling” inputs. Do not pass every user input through the inference() signature
like inference(mouse_x, mouse_y, keyboard, speed, prompt, ...). This becomes unwieldy as inputs
grow. Instead, use setters so the pipeline reads its own instance variables directly.
If you are writing a model native to Reactor, write the generation logic directly in your
VideoModel class. This is cleaner when building from scratch:
Some models are designed to generate a fixed number of frames (e.g., 120 frames for a 4-second
clip). In Reactor, sessions should run indefinitely until explicitly stopped. If your model has a
natural endpoint, wrap it instead of returning from start_session().
Copy
def start_session(self): while not get_ctx().should_stop(): # Reset state for a new generation cycle self._reset_generation_state() # Generate your fixed-length sequence for block_idx in range(self.num_blocks): if get_ctx().should_stop(): return frames = self.forward_block(block_idx) get_ctx().emit_block(frames) # Loop wraps back to start instead of returning
The outer while loop makes the pipeline wrap: when it finishes generating, it resets and starts
again. The session continues until the stop signal is received.
The session stays active until start_session() returns. Your pipeline should:
Check get_ctx().should_stop() regularly and exit gracefully when true
Raise exceptions properly so errors can be reported (especially important in deployment)
Clean up session state before returning
Copy
def start_session(self): try: while not get_ctx().should_stop(): frames = self.generate() get_ctx().emit_block(frames) except Exception as e: # Log and re-raise so Reactor can report the error logger.error(f"Generation failed: {e}") raise
For detailed patterns on cleanup, state reset, and error handling, see
Model Cleanup.Now that you understand how to structure your pipeline for real-time generation, the next step is
learning how frame emission works and what formats Reactor accepts.