Every Reactor model is a Python class that extends VideoModel and is decorated with @model.
from reactor_runtime import VideoModel, model, command, get_ctx
from pydantic import Field
@model(name="my-model", config="config.yml")
class MyModel(VideoModel):
def __init__(self, config):
self.pipeline = load_pipeline(config)
self._prompt = "a calm forest"
@command("set_prompt", description="Change the generation prompt")
def set_prompt(self, prompt: str):
self._prompt = prompt
def start_session(self):
while not get_ctx().should_stop():
frames = self.pipeline.generate(prompt=self._prompt)
get_ctx().get_track().emit(frames)
@model
Identifies your model class to the Reactor runtime.
@model(name="my-model", config="config.yml", weights=["my-weights"])
| Parameter | Required | Description |
|---|
name | Yes | Unique identifier used in logs, deployments, and client connections. Lowercase letters, numbers, and hyphens only. |
config | No | Path to a YAML file with default configuration values. Passed to __init__ as a DictConfig. |
weights | No | List of weight folder names for Reactor’s weight management system in production. |
One class per project should be decorated with @model — the CLI uses it to find your model’s entry point.
__init__
Called once when the model loads, before any user connects. Load weights, initialize pipelines, and allocate GPU memory here.
def __init__(self, config):
self.pipeline = load_pipeline(device="cuda")
self.num_steps = config.get("num_steps", 8)
self._prompt = "a calm forest"
config is the merged result of your YAML file and any CLI overrides. Weights stay in memory across all sessions — __init__ is not called again between users.
start_session
Called each time a user connects. This is your generation loop. When it returns, the session ends.
def start_session(self):
while not get_ctx().should_stop():
frames = self.pipeline.generate(prompt=self._prompt)
get_ctx().get_track().emit(frames)
start_session runs in a background thread. The three things you do inside it:
get_ctx().should_stop() — returns True when the user disconnects or the runtime shuts down. Check it regularly. For slow forward passes, also check mid-operation so the session exits promptly:
def generate_block(self):
for step in self.denoising_steps:
if get_ctx().should_stop():
return None
self.latent = self.denoise_step(self.latent, step)
return self.decode(self.latent)
get_ctx().get_track().emit(frame) — sends a frame to the client. See Tracks for multi-track models and format details.
get_ctx().send(data) — sends a JSON message to the frontend, useful for progress updates or custom events:
get_ctx().send({"type": "progress", "frames": self.frame_count})
Never call get_ctx() outside of an active session — i.e. not in __init__, not in class-level
attributes. Only inside start_session or code called from there.
Session cleanup
When start_session returns, your model must be ready for the next user. Reset session-specific state — prompts, inputs, conditioning, counters — while leaving weights and pipelines loaded.
Get this wrong and state leaks between users. User B starts with User A’s prompt, inputs, or accumulated conditioning.
Use try/finally so cleanup always runs, whether the session ends normally or errors:
def start_session(self):
try:
self.pipeline.run()
finally:
self._reset_state()
def _reset_state(self):
self._prompt = "a calm forest"
self._speed = 1.0
self.pipeline.reset()
Resetting at the end (not the start) means the incoming user never waits for cleanup — the model is ready immediately.
| Reset between sessions | Keep loaded |
|---|
| User prompts and inputs | Model weights |
| Conditioning tensors | Pipeline objects |
| Frame counters | GPU memory allocations |
| Accumulated buffers | Configuration values |
@command
Decorate methods with @command to accept real-time input from users during a session.
@command("set_prompt", description="Change the generation prompt")
def set_prompt(self, prompt: str):
self._prompt = prompt
| Parameter | Required | Description |
|---|
name | Yes | Command name clients use to call this method. |
description | No | Included in the schema for frontend auto-generated UI. |
Reactor uses type annotations to build a schema for each command, which drives validation and lets frontends auto-generate controls.
Literal for fixed options — renders as a dropdown or button group:
from typing import Literal
@command("move")
def move(self, direction: Literal["forward", "back", "left", "right", "none"]):
self._direction = direction
Pydantic Field for constraints — ge/le map naturally to sliders:
from pydantic import Field
@command("set_speed")
def set_speed(self, speed: float = Field(0.5, ge=0.0, le=1.0)):
self._speed = speed
A default value makes the parameter optional; ... makes it required.
Thread safety: command methods run in a different thread from start_session. Store inputs as instance variables and read them in your loop:
@command("move")
def move(self, direction: str):
self._direction = direction # written from command thread
def start_session(self):
while not get_ctx().should_stop():
frame = self.pipeline.step(self._direction) # read from session thread
get_ctx().get_track().emit(frame)
Python’s GIL makes simple attribute reads and writes atomic. For multiple related values that must update together, use a threading.Lock.
Override to receive every inbound media frame from the client. Called from the media thread each time new data arrives.
from reactor_runtime.transports.media import MediaBundle
def on_media(self, bundle: MediaBundle):
webcam_data = bundle.get_track("webcam")
if webcam_data is not None:
self.pipeline.push_frame(webcam_data.data)
For most models, polling with get_ctx().get_track("webcam").latest() inside start_session is simpler. Use on_media only when your pipeline must process every inbound frame without dropping any. See Tracks.