Skip to main content

Overview

This guide walks you through converting a typical video generation model into a Reactor-compatible streaming model. We’ll use a real example to show:
  1. What you start with: A model that generates all frames, then saves to a video file
  2. What you need to change: How to identify key parts of your model’s behavior
  3. What you end up with: A streaming model that emits frames in real-time and stays loaded for multiple users

The Starting Point: Non-Streaming Model

Most video generation models work like this:
# Your typical inference script
from longlive.pipeline.inference import InferencePipeline
import torch

def generate_video(prompt, output_path):
    # Load model and weights (slow)
    device = torch.device("cuda")
    pipeline = InferencePipeline(config, device=device)
    pipeline.load_weights("path/to/weights.pt")

    # Generate all frames (slow)
    frames = []
    noise = torch.randn([1, 100, 16, 60, 80], device=device)

    for latent in pipeline.generate(prompt, noise):
        # Decode latent to pixel space
        frame = pipeline.vae.decode(latent)
        frames.append(frame)

    # Save to video file at the END
    save_video(frames, output_path)
    print(f"Video saved to {output_path}")

# Usage
generate_video("a beautiful sunset", "output.mp4")
Problems with this approach:
  • User waits until the END to see anything
  • Model loads/unloads for each generation (slow)
  • Can’t interact during generation
  • Not suitable for real-time applications
We need to make it:
  • Streaming: Emit frames as they’re generated
  • Persistent: Keep model loaded between users
  • Interactive: Accept user input during generation
  • Resettable: Clean state for next session

Step 1: Identify Model Behavior Parts

Look at your existing model and identify these three phases:

Phase A: Model Initialization (Heavy)

# HEAVY operations - should happen ONCE
device = torch.device("cuda")
pipeline = InferencePipeline(config, device=device)
pipeline.load_weights("path/to/weights.pt")  # This is SLOW
When: Once at startup Duration: Minutes Frequency: Once per runtime lifecycle

Phase B: Frame Generation (Per-Session)

# Per-session operations - happens each user
noise = torch.randn([1, 100, 16, 60, 80], device=device)

for latent in pipeline.generate(prompt, noise):
    frame = pipeline.vae.decode(latent)
    # This is where frames are produced
When: Every time a user connects Duration: Seconds to minutes Frequency: Once per user session

Phase C: Cleanup (Lightweight)

# Cleanup - should be FAST
frames.clear()
# Maybe reset some internal state
When: When user disconnects Duration: Milliseconds Frequency: Once per user session

Step 2: Create the VideoModel Wrapper

Now we reorganize these phases into the VideoModel structure:
from reactor_runtime import VideoModel, command, get_ctx
from longlive.pipeline.inference import InferencePipeline
from pydantic import Field
import torch

class LongLiveVideoModel(VideoModel):

    def __init__(self, fps=30, size=(480, 640), **kwargs):
        """
        Phase A: HEAVY initialization
        Called ONCE when runtime starts
        """
        self._fps = fps
        self._height, self._width = size
        self._device = torch.device("cuda")

        # Load config
        config = load_config("longlive/configs/inference.yaml")

        # Initialize pipeline (HEAVY - do this once!)
        self._pipeline = InferencePipeline(config, device=self._device)

        # Load weights from Model Registry
        # (or local path if you don't have S3 access yet)
        weights_path = VideoModel.weights("LongLive")
        state_dict = torch.load(weights_path / "models/longlive_base.pt")
        self._pipeline.load_state_dict(state_dict)

        # Move to GPU (HEAVY - do this once!)
        self._pipeline.to(self._device)

        # Store user's current prompt (for interactive updates)
        # Decided if you want to default an initial prompt or wait for user to send one.
        self._current_prompt = "a beautiful sunset"

        print("LongLive model loaded and ready")

    def start_session(self):
        """
        Phase B: Frame generation
        Called when each user connects
        """
        try:
            # Lightweight session setup
            noise = torch.randn(
                [1, 100, 16, self._height // 8, self._width // 8],
                device=self._device
            )

            while self._current_prompt is None:
                # Wait for user to send a prompt.
                time.sleep(0.01)
                # Cooperative cancellation. Cancellation means you still need to reset the model, so
                # you need to break, not return.
                if get_ctx()._stop_evt and get_ctx()._stop_evt.is_set():
                    break

            # Generate frames and STREAM them in real-time
            for latent in self._pipeline.generate(self._current_prompt, noise):
                if get_ctx()._stop_evt and get_ctx()._stop_evt.is_set():
                    break

                # Decode latent to pixel space
                frame = self._pipeline.vae.decode(latent)

                # EMIT frame immediately (streaming!)
                get_ctx().emit_frame(frame)

        except Exception as e:
            # Reset pipeline to initial state before raising
            self._pipeline.reset()
            raise e  # Notify runtime of error

        finally:
            # Phase C: Cleanup for next user.
            self._pipeline.reset()  # Reset internal state
            torch.cuda.empty_cache()  # Free unused memory

            # Model stays loaded for next user!

Step 3: Making It Interactive

In traditional models, you set the prompt BEFORE generation starts:
# Traditional: prompt is fixed before generation
pipeline.generate(prompt="a sunset", ...)
But users want to change parameters during generation! Here’s how:

Find Where Conditioning is Used

Look inside your model’s generation loop:
# Inside your pipeline's generate() method
def generate(self, prompt, noise):
    # Encode prompt to conditioning
    conditioning = self.text_encoder.encode(prompt)  # ← Used here!

    for timestep in timesteps:
        # Model uses conditioning during denoising
        noise_pred = self.model(noise, timestep, conditioning)  # ← And here!
        noise = self.scheduler.step(noise_pred, noise, timestep)
        yield noise

Make Conditioning Dynamic

Instead of encoding the prompt once at the start, check for updates in the loop:
class LongLiveVideoModel(VideoModel):
    def __init__(self, fps=30, size=(480, 640), **kwargs):
        # ... initialization code ...

        # Store current prompt (can be updated by user)
        self._pipeline._current_prompt = "a beautiful sunset" # The last prompt sent by the user
        self._pipeline._last_encoded_prompt = None # The last actually encoded prompt
        self._pipeline._cached_conditioning = None # The encoded version of the prompt

    # ... start_session code ...


    # The name of the command in the decorator can be anything!
    # It is going to be used as an identifier in the frontend to call that specific command.
    @command("set_prompt", description="Change generation prompt in real-time")
    def set_prompt_command(
        self,
        prompt: str = Field(..., description="New prompt to use")
    ):
        """User can call this from frontend during generation."""
        self._pipeline._current_prompt = prompt
        print(f"Prompt updated to: {prompt}")

Update Your Pipeline to Use Dynamic Conditioning

Modify your pipeline’s generation method:

# In your pipeline class
def get_current_conditioning(self):
    """
    Called during generation loop.
    Returns updated conditioning if prompt changed.
    """
    # Only re-encode if prompt changed
    if self._current_prompt != self._last_encoded_prompt:
        self._cached_conditioning = self._pipeline.text_encoder.encode(
            self._current_prompt
        )
        self._last_encoded_prompt = self._current_prompt

    return self._cached_conditioning

def generate_dynamic(self, noise):
    """
    Generate frames with dynamic conditioning.
    """
    for timestep in self.timesteps:
        # Get CURRENT conditioning (may have changed!)
        conditioning = self.get_current_conditioning()

        # Use current conditioning
        noise_pred = self.model(noise, timestep, conditioning)
        noise = self.scheduler.step(noise_pred, noise, timestep)

        yield noise

Generalize Beyond Prompts

This pattern works for any input type: you can add as many commands as you want, and you can change the parameters of the command.
class InteractiveModel(VideoModel):

    @command("set_starting_prompt", description="Update starting text prompt")
    def set_prompt_command(self, prompt: str = Field(...)):
        self._pipeline._current_prompt = prompt

    @command("set_reference_image", description="Upload reference image")
    def set_reference_image_command(
        self,
        base64_image: str = Field(..., description="Base64 encoded image")
    ):
        self._pipeline._current_reference_image = decode_base64_image(base64_image)

    @command("set_camera_angle", description="Adjust camera angle")
    def set_camera_angle_command(
        self,
        angle: float = Field(..., description="Camera angle in degrees")
    ):
        self._pipeline._current_camera_angle = angle

Step 4: Handling Weights

If You Have Model Registry Access

Use the automatic weight system:
def __init__(self, **kwargs):
    # Downloads from S3 automatically
    weights_path = VideoModel.weights("YourModel-Weights")
    self.model.load_state_dict(torch.load(weights_path / "model.pt"))

If You Don’t Have S3 Access Yet

Load weights normally from local paths during development:
def __init__(self, **kwargs):
    # Use local paths for now
    local_weights_path = Path("./weights/model.pt")
    self.model.load_state_dict(torch.load(local_weights_path))
No S3 access? No problem! Develop with local weights. When you’re ready to deploy, the Reactor team will upload your weights to S3 and update your model to use VideoModel.weights(). Your code barely changes.

Step 5: Error Handling and Cleanup

Wrong approach:
def start_session(self):
    try:
        # generation...
    except Exception as e:
        # Just raise - NO!
        raise e
Correct approach:
def start_session(self):
    try:
        # Generation loop
        for frame in self.generate():
            get_ctx().emit_frame(frame)

    except Exception as e:
        # Reset model to initial state FIRST
        self._pipeline.reset()

        # Then notify runtime of error
        raise e

    finally:
        # Always reset for next user
        self._pipeline.reset()
        torch.cuda.empty_cache()
Why this matters:
  • If an error occurs mid-generation, the model might be in a bad state
  • Next user shouldn’t inherit that bad state
  • Always reset to clean state before raising
  • The runtime handles the error and cleans up the session

Step 6: Create the Manifest

{
  "reactor-runtime": "0.0.0",
  "class": "model_longlive:LongLiveVideoModel",
  "model_name": "longlive",
  "model_version": "1.0.0",
  "args": {
    "fps": 30,
    "size": [480, 640]
  },
  "weights": ["LongLive", "Wan2_1_VAE"]
}

Step 7: Test Your Model

# Start runtime
reactor run

# In another terminal, test with a frontend
npx create-reactor-app
cd test-app
pnpm run dev
What to verify:
  1. ✅ Model loads once at startup (check logs)
  2. ✅ First user connects quickly (model already loaded)
  3. ✅ Frames stream in real-time (not saved to file)
  4. ✅ User can disconnect and reconnect (model stays loaded)
  5. ✅ Interactive commands work during generation
  6. ✅ Second user connects instantly (no reload)

Complete Example: Matrix-2 Gaming Model

Here’s a production example showing these concepts in action:
from reactor_runtime import VideoModel, command
from matrix.inference_streaming import matrix_main
from pydantic import Field
from typing import Literal

class Matrix2VideoModel(VideoModel):
    def __init__(self, fps=15, size=(480, 640), **kwargs):
        """Heavy initialization - happens once."""
        self._fps = fps
        self._height, self._width = size

        # Get weights from Model Registry
        model_path = VideoModel.weights("Matrix-Game-2_0-base")

        # Initialize game engine (HEAVY - do once!)
        self._game = matrix_main(
            config_path="matrix/configs/inference_yaml/inference_universal.yaml",
            checkpoint_path=model_path / "base_distill.safetensors",
            mode="universal",
            compile=False
        )

        print("Matrix-2 model ready")

    def start_session(self):
        """Start game when user connects."""
        try:
            # Start generation loop
            # (internally emits frames and handles user input)
            self._game.start()

        except Exception as e:
            # Reset game to initial state
            self._game.reset()
            raise e

    @command("control", description="Control the game")
    def control_command(
        self,
        mouse_key: Literal['J', 'K', 'L', 'I', 'U'] = Field(
            ...,
            description="Mouse: J=left, K=down, L=right, I=up, U=none"
        ),
        keyboard_key: Literal['W', 'A', 'S', 'D', 'Q'] = Field(
            ...,
            description="Keyboard: W=forward, A=left, S=back, D=right, Q=none"
        )
    ):
        """User controls from the frontend - updates DURING generation."""
        if self._game.pipeline is not None:
            # Update game state dynamically
            self._game.pipeline.set_action(mouse_key, keyboard_key)

    @command("set_starting_image", description="Upload starting frame")
    def set_starting_image_command(
        self,
        base64_image: str = Field(..., description="Base64 encoded image"),
        image_id: str = Field(..., description="Image identifier")
    ):
        """Accept image input modality."""
        self._game.set_starting_image(base64_image, image_id)

    @command("reset", description="Reset to default state")
    def reset_command(self):
        """Manual reset command."""
        self._game.reset()
Key features demonstrated:
  • ✅ Heavy loading in __init__
  • ✅ Streaming frames during generation
  • ✅ Multiple interactive commands
  • ✅ Different input modalities (controls, images)
  • ✅ Proper reset on errors and between sessions

Best Practices Summary

1. Separate Heavy from Light Operations

def __init__(self):
    # HEAVY: Model loading, weight loading, compilation
    self.model = load_model()
    self.model.load_weights()
    self.model = torch.compile(self.model)

def start_session(self):
    # LIGHT: Session setup, noise generation, loop initialization
    noise = torch.randn(...)

2. Always Reset Before Raising

try:
    # generation...
except Exception as e:
    self._pipeline.reset()  # Clean state FIRST
    raise e  # Then notify
finally:
    self._pipeline.reset()  # Always reset

3. Make Conditioning Dynamic

# Don't: Encode once at start
conditioning = encode(prompt)
for step in steps:
    model(noise, conditioning)  # Fixed!

# Do: Check for updates each step
for step in steps:
    conditioning = self.get_current_conditioning()  # May have changed!
    model(noise, conditioning)

4. Stream Frames Immediately

# Don't: Collect then save
frames = []
for latent in generate():
    frames.append(decode(latent))
save_video(frames)

# Do: Emit immediately
for latent in generate():
    frame = decode(latent)
    get_ctx().emit_frame(frame)  # Real-time!

Next Steps