Overview
This guide walks you through converting a typical video generation model into a Reactor-compatible streaming model. We’ll use a real example to show:
- What you start with: A model that generates all frames, then saves to a video file
- What you need to change: How to identify key parts of your model’s behavior
- What you end up with: A streaming model that emits frames in real-time and stays loaded for multiple users
The Starting Point: Non-Streaming Model
Most video generation models work like this:
# Your typical inference script
from longlive.pipeline.inference import InferencePipeline
import torch
def generate_video(prompt, output_path):
# Load model and weights (slow)
device = torch.device("cuda")
pipeline = InferencePipeline(config, device=device)
pipeline.load_weights("path/to/weights.pt")
# Generate all frames (slow)
frames = []
noise = torch.randn([1, 100, 16, 60, 80], device=device)
for latent in pipeline.generate(prompt, noise):
# Decode latent to pixel space
frame = pipeline.vae.decode(latent)
frames.append(frame)
# Save to video file at the END
save_video(frames, output_path)
print(f"Video saved to {output_path}")
# Usage
generate_video("a beautiful sunset", "output.mp4")
Problems with this approach:
- User waits until the END to see anything
- Model loads/unloads for each generation (slow)
- Can’t interact during generation
- Not suitable for real-time applications
We need to make it:
- Streaming: Emit frames as they’re generated
- Persistent: Keep model loaded between users
- Interactive: Accept user input during generation
- Resettable: Clean state for next session
Step 1: Identify Model Behavior Parts
Look at your existing model and identify these three phases:
Phase A: Model Initialization (Heavy)
# HEAVY operations - should happen ONCE
device = torch.device("cuda")
pipeline = InferencePipeline(config, device=device)
pipeline.load_weights("path/to/weights.pt") # This is SLOW
When: Once at startup
Duration: Minutes
Frequency: Once per runtime lifecycle
Phase B: Frame Generation (Per-Session)
# Per-session operations - happens each user
noise = torch.randn([1, 100, 16, 60, 80], device=device)
for latent in pipeline.generate(prompt, noise):
frame = pipeline.vae.decode(latent)
# This is where frames are produced
When: Every time a user connects
Duration: Seconds to minutes
Frequency: Once per user session
Phase C: Cleanup (Lightweight)
# Cleanup - should be FAST
frames.clear()
# Maybe reset some internal state
When: When user disconnects
Duration: Milliseconds
Frequency: Once per user session
Step 2: Create the VideoModel Wrapper
Now we reorganize these phases into the VideoModel structure:
from reactor_runtime import VideoModel, command, get_ctx
from longlive.pipeline.inference import InferencePipeline
from pydantic import Field
import torch
class LongLiveVideoModel(VideoModel):
def __init__(self, fps=30, size=(480, 640), **kwargs):
"""
Phase A: HEAVY initialization
Called ONCE when runtime starts
"""
self._fps = fps
self._height, self._width = size
self._device = torch.device("cuda")
# Load config
config = load_config("longlive/configs/inference.yaml")
# Initialize pipeline (HEAVY - do this once!)
self._pipeline = InferencePipeline(config, device=self._device)
# Load weights from Model Registry
# (or local path if you don't have S3 access yet)
weights_path = VideoModel.weights("LongLive")
state_dict = torch.load(weights_path / "models/longlive_base.pt")
self._pipeline.load_state_dict(state_dict)
# Move to GPU (HEAVY - do this once!)
self._pipeline.to(self._device)
# Store user's current prompt (for interactive updates)
# Decided if you want to default an initial prompt or wait for user to send one.
self._current_prompt = "a beautiful sunset"
print("LongLive model loaded and ready")
def start_session(self):
"""
Phase B: Frame generation
Called when each user connects
"""
try:
# Lightweight session setup
noise = torch.randn(
[1, 100, 16, self._height // 8, self._width // 8],
device=self._device
)
while self._current_prompt is None:
# Wait for user to send a prompt.
time.sleep(0.01)
# Cooperative cancellation. Cancellation means you still need to reset the model, so
# you need to break, not return.
if get_ctx()._stop_evt and get_ctx()._stop_evt.is_set():
break
# Generate frames and STREAM them in real-time
for latent in self._pipeline.generate(self._current_prompt, noise):
if get_ctx()._stop_evt and get_ctx()._stop_evt.is_set():
break
# Decode latent to pixel space
frame = self._pipeline.vae.decode(latent)
# EMIT frame immediately (streaming!)
get_ctx().emit_frame(frame)
except Exception as e:
# Reset pipeline to initial state before raising
self._pipeline.reset()
raise e # Notify runtime of error
finally:
# Phase C: Cleanup for next user.
self._pipeline.reset() # Reset internal state
torch.cuda.empty_cache() # Free unused memory
# Model stays loaded for next user!
Step 3: Making It Interactive
In traditional models, you set the prompt BEFORE generation starts:
# Traditional: prompt is fixed before generation
pipeline.generate(prompt="a sunset", ...)
But users want to change parameters during generation! Here’s how:
Find Where Conditioning is Used
Look inside your model’s generation loop:
# Inside your pipeline's generate() method
def generate(self, prompt, noise):
# Encode prompt to conditioning
conditioning = self.text_encoder.encode(prompt) # ← Used here!
for timestep in timesteps:
# Model uses conditioning during denoising
noise_pred = self.model(noise, timestep, conditioning) # ← And here!
noise = self.scheduler.step(noise_pred, noise, timestep)
yield noise
Make Conditioning Dynamic
Instead of encoding the prompt once at the start, check for updates in the loop:
class LongLiveVideoModel(VideoModel):
def __init__(self, fps=30, size=(480, 640), **kwargs):
# ... initialization code ...
# Store current prompt (can be updated by user)
self._pipeline._current_prompt = "a beautiful sunset" # The last prompt sent by the user
self._pipeline._last_encoded_prompt = None # The last actually encoded prompt
self._pipeline._cached_conditioning = None # The encoded version of the prompt
# ... start_session code ...
# The name of the command in the decorator can be anything!
# It is going to be used as an identifier in the frontend to call that specific command.
@command("set_prompt", description="Change generation prompt in real-time")
def set_prompt_command(
self,
prompt: str = Field(..., description="New prompt to use")
):
"""User can call this from frontend during generation."""
self._pipeline._current_prompt = prompt
print(f"Prompt updated to: {prompt}")
Update Your Pipeline to Use Dynamic Conditioning
Modify your pipeline’s generation method:
# In your pipeline class
def get_current_conditioning(self):
"""
Called during generation loop.
Returns updated conditioning if prompt changed.
"""
# Only re-encode if prompt changed
if self._current_prompt != self._last_encoded_prompt:
self._cached_conditioning = self._pipeline.text_encoder.encode(
self._current_prompt
)
self._last_encoded_prompt = self._current_prompt
return self._cached_conditioning
def generate_dynamic(self, noise):
"""
Generate frames with dynamic conditioning.
"""
for timestep in self.timesteps:
# Get CURRENT conditioning (may have changed!)
conditioning = self.get_current_conditioning()
# Use current conditioning
noise_pred = self.model(noise, timestep, conditioning)
noise = self.scheduler.step(noise_pred, noise, timestep)
yield noise
Generalize Beyond Prompts
This pattern works for any input type: you can add as many commands as you want, and you can change the parameters of the command.
class InteractiveModel(VideoModel):
@command("set_starting_prompt", description="Update starting text prompt")
def set_prompt_command(self, prompt: str = Field(...)):
self._pipeline._current_prompt = prompt
@command("set_reference_image", description="Upload reference image")
def set_reference_image_command(
self,
base64_image: str = Field(..., description="Base64 encoded image")
):
self._pipeline._current_reference_image = decode_base64_image(base64_image)
@command("set_camera_angle", description="Adjust camera angle")
def set_camera_angle_command(
self,
angle: float = Field(..., description="Camera angle in degrees")
):
self._pipeline._current_camera_angle = angle
Step 4: Handling Weights
If You Have Model Registry Access
Use the automatic weight system:
def __init__(self, **kwargs):
# Downloads from S3 automatically
weights_path = VideoModel.weights("YourModel-Weights")
self.model.load_state_dict(torch.load(weights_path / "model.pt"))
If You Don’t Have S3 Access Yet
Load weights normally from local paths during development:
def __init__(self, **kwargs):
# Use local paths for now
local_weights_path = Path("./weights/model.pt")
self.model.load_state_dict(torch.load(local_weights_path))
No S3 access? No problem! Develop with local weights. When you’re ready to
deploy, the Reactor team will upload your weights to S3 and update your model
to use VideoModel.weights(). Your code barely changes.
Step 5: Error Handling and Cleanup
Wrong approach:
def start_session(self):
try:
# generation...
except Exception as e:
# Just raise - NO!
raise e
Correct approach:
def start_session(self):
try:
# Generation loop
for frame in self.generate():
get_ctx().emit_frame(frame)
except Exception as e:
# Reset model to initial state FIRST
self._pipeline.reset()
# Then notify runtime of error
raise e
finally:
# Always reset for next user
self._pipeline.reset()
torch.cuda.empty_cache()
Why this matters:
- If an error occurs mid-generation, the model might be in a bad state
- Next user shouldn’t inherit that bad state
- Always reset to clean state before raising
- The runtime handles the error and cleans up the session
Step 6: Create the Manifest
{
"reactor-runtime": "0.0.0",
"class": "model_longlive:LongLiveVideoModel",
"model_name": "longlive",
"model_version": "1.0.0",
"args": {
"fps": 30,
"size": [480, 640]
},
"weights": ["LongLive", "Wan2_1_VAE"]
}
Step 7: Test Your Model
# Start runtime
reactor run
# In another terminal, test with a frontend
npx create-reactor-app
cd test-app
pnpm run dev
What to verify:
- ✅ Model loads once at startup (check logs)
- ✅ First user connects quickly (model already loaded)
- ✅ Frames stream in real-time (not saved to file)
- ✅ User can disconnect and reconnect (model stays loaded)
- ✅ Interactive commands work during generation
- ✅ Second user connects instantly (no reload)
Complete Example: Matrix-2 Gaming Model
Here’s a production example showing these concepts in action:
from reactor_runtime import VideoModel, command
from matrix.inference_streaming import matrix_main
from pydantic import Field
from typing import Literal
class Matrix2VideoModel(VideoModel):
def __init__(self, fps=15, size=(480, 640), **kwargs):
"""Heavy initialization - happens once."""
self._fps = fps
self._height, self._width = size
# Get weights from Model Registry
model_path = VideoModel.weights("Matrix-Game-2_0-base")
# Initialize game engine (HEAVY - do once!)
self._game = matrix_main(
config_path="matrix/configs/inference_yaml/inference_universal.yaml",
checkpoint_path=model_path / "base_distill.safetensors",
mode="universal",
compile=False
)
print("Matrix-2 model ready")
def start_session(self):
"""Start game when user connects."""
try:
# Start generation loop
# (internally emits frames and handles user input)
self._game.start()
except Exception as e:
# Reset game to initial state
self._game.reset()
raise e
@command("control", description="Control the game")
def control_command(
self,
mouse_key: Literal['J', 'K', 'L', 'I', 'U'] = Field(
...,
description="Mouse: J=left, K=down, L=right, I=up, U=none"
),
keyboard_key: Literal['W', 'A', 'S', 'D', 'Q'] = Field(
...,
description="Keyboard: W=forward, A=left, S=back, D=right, Q=none"
)
):
"""User controls from the frontend - updates DURING generation."""
if self._game.pipeline is not None:
# Update game state dynamically
self._game.pipeline.set_action(mouse_key, keyboard_key)
@command("set_starting_image", description="Upload starting frame")
def set_starting_image_command(
self,
base64_image: str = Field(..., description="Base64 encoded image"),
image_id: str = Field(..., description="Image identifier")
):
"""Accept image input modality."""
self._game.set_starting_image(base64_image, image_id)
@command("reset", description="Reset to default state")
def reset_command(self):
"""Manual reset command."""
self._game.reset()
Key features demonstrated:
- ✅ Heavy loading in
__init__
- ✅ Streaming frames during generation
- ✅ Multiple interactive commands
- ✅ Different input modalities (controls, images)
- ✅ Proper reset on errors and between sessions
Best Practices Summary
1. Separate Heavy from Light Operations
def __init__(self):
# HEAVY: Model loading, weight loading, compilation
self.model = load_model()
self.model.load_weights()
self.model = torch.compile(self.model)
def start_session(self):
# LIGHT: Session setup, noise generation, loop initialization
noise = torch.randn(...)
2. Always Reset Before Raising
try:
# generation...
except Exception as e:
self._pipeline.reset() # Clean state FIRST
raise e # Then notify
finally:
self._pipeline.reset() # Always reset
3. Make Conditioning Dynamic
# Don't: Encode once at start
conditioning = encode(prompt)
for step in steps:
model(noise, conditioning) # Fixed!
# Do: Check for updates each step
for step in steps:
conditioning = self.get_current_conditioning() # May have changed!
model(noise, conditioning)
# Don't: Collect then save
frames = []
for latent in generate():
frames.append(decode(latent))
save_video(frames)
# Do: Emit immediately
for latent in generate():
frame = decode(latent)
get_ctx().emit_frame(frame) # Real-time!
Next Steps