Skip to main content
Every Reactor model is a Python class that extends VideoModel and is decorated with @model.
from reactor_runtime import VideoModel, model, command, get_ctx
from pydantic import Field

@model(name="my-model", config="config.yml")
class MyModel(VideoModel):
    def __init__(self, config):
        self.pipeline = load_pipeline(config)
        self._prompt = "a calm forest"

    @command("set_prompt", description="Change the generation prompt")
    def set_prompt(self, prompt: str):
        self._prompt = prompt

    def start_session(self):
        while not get_ctx().should_stop():
            frames = self.pipeline.generate(prompt=self._prompt)
            get_ctx().get_track().emit(frames)

@model

Identifies your model class to the Reactor runtime.
@model(name="my-model", config="config.yml", weights=["my-weights"])
ParameterRequiredDescription
nameYesUnique identifier used in logs, deployments, and client connections. Lowercase letters, numbers, and hyphens only.
configNoPath to a YAML file with default configuration values. Passed to __init__ as a DictConfig.
weightsNoList of weight folder names for Reactor’s weight management system in production.
One class per project should be decorated with @model — the CLI uses it to find your model’s entry point.

__init__

Called once when the model loads, before any user connects. Load weights, initialize pipelines, and allocate GPU memory here.
def __init__(self, config):
    self.pipeline = load_pipeline(device="cuda")
    self.num_steps = config.get("num_steps", 8)
    self._prompt = "a calm forest"
config is the merged result of your YAML file and any CLI overrides. Weights stay in memory across all sessions — __init__ is not called again between users.

start_session

Called each time a user connects. This is your generation loop. When it returns, the session ends.
def start_session(self):
    while not get_ctx().should_stop():
        frames = self.pipeline.generate(prompt=self._prompt)
        get_ctx().get_track().emit(frames)
start_session runs in a background thread. The three things you do inside it: get_ctx().should_stop() — returns True when the user disconnects or the runtime shuts down. Check it regularly. For slow forward passes, also check mid-operation so the session exits promptly:
def generate_block(self):
    for step in self.denoising_steps:
        if get_ctx().should_stop():
            return None
        self.latent = self.denoise_step(self.latent, step)
    return self.decode(self.latent)
get_ctx().get_track().emit(frame) — sends a frame to the client. See Tracks for multi-track models and format details. get_ctx().send(data) — sends a JSON message to the frontend, useful for progress updates or custom events:
get_ctx().send({"type": "progress", "frames": self.frame_count})
Never call get_ctx() outside of an active session — i.e. not in __init__, not in class-level attributes. Only inside start_session or code called from there.

Session cleanup

When start_session returns, your model must be ready for the next user. Reset session-specific state — prompts, inputs, conditioning, counters — while leaving weights and pipelines loaded.
Get this wrong and state leaks between users. User B starts with User A’s prompt, inputs, or accumulated conditioning.
Use try/finally so cleanup always runs, whether the session ends normally or errors:
def start_session(self):
    try:
        self.pipeline.run()
    finally:
        self._reset_state()

def _reset_state(self):
    self._prompt = "a calm forest"
    self._speed = 1.0
    self.pipeline.reset()
Resetting at the end (not the start) means the incoming user never waits for cleanup — the model is ready immediately.
Reset between sessionsKeep loaded
User prompts and inputsModel weights
Conditioning tensorsPipeline objects
Frame countersGPU memory allocations
Accumulated buffersConfiguration values

@command

Decorate methods with @command to accept real-time input from users during a session.
@command("set_prompt", description="Change the generation prompt")
def set_prompt(self, prompt: str):
    self._prompt = prompt
ParameterRequiredDescription
nameYesCommand name clients use to call this method.
descriptionNoIncluded in the schema for frontend auto-generated UI.
Reactor uses type annotations to build a schema for each command, which drives validation and lets frontends auto-generate controls. Literal for fixed options — renders as a dropdown or button group:
from typing import Literal

@command("move")
def move(self, direction: Literal["forward", "back", "left", "right", "none"]):
    self._direction = direction
Pydantic Field for constraintsge/le map naturally to sliders:
from pydantic import Field

@command("set_speed")
def set_speed(self, speed: float = Field(0.5, ge=0.0, le=1.0)):
    self._speed = speed
A default value makes the parameter optional; ... makes it required. Thread safety: command methods run in a different thread from start_session. Store inputs as instance variables and read them in your loop:
@command("move")
def move(self, direction: str):
    self._direction = direction          # written from command thread

def start_session(self):
    while not get_ctx().should_stop():
        frame = self.pipeline.step(self._direction)  # read from session thread
        get_ctx().get_track().emit(frame)
Python’s GIL makes simple attribute reads and writes atomic. For multiple related values that must update together, use a threading.Lock.

on_media

Override to receive every inbound media frame from the client. Called from the media thread each time new data arrives.
from reactor_runtime.transports.media import MediaBundle

def on_media(self, bundle: MediaBundle):
    webcam_data = bundle.get_track("webcam")
    if webcam_data is not None:
        self.pipeline.push_frame(webcam_data.data)
For most models, polling with get_ctx().get_track("webcam").latest() inside start_session is simpler. Use on_media only when your pipeline must process every inbound frame without dropping any. See Tracks.