Decorators
The Python SDK provides decorators for registering event handlers. Decorators are defined on a Reactor instance and are the idiomatic Python way to handle events.
All decorators are equivalent to calling reactor.on(event, handler). Use whichever style you prefer.
@reactor.on_frame
Registers a callback to receive video frames as NumPy arrays.
@reactor.on_frame
def handle_frame(frame: NDArray[np.uint8]) -> None:
...
Handler receives:
frame — A NumPy array with shape (H, W, 3), dtype uint8, in RGB color order.
Example:
from reactor_sdk import Reactor
reactor = Reactor(model_name="livecore", api_key=api_key)
@reactor.on_frame
def handle_frame(frame):
print(f"Frame: {frame.shape}") # e.g. (720, 1280, 3)
# Save with PIL
from PIL import Image
Image.fromarray(frame).save("frame.png")
# Or use with OpenCV (note: convert RGB → BGR)
import cv2
bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
cv2.imwrite("frame.png", bgr)
This is equivalent to calling reactor.set_frame_callback(handle_frame).
@reactor.on_message
Registers a handler for messages received from the model.
@reactor.on_message
def handle_message(message: Any) -> None:
...
Handler receives:
message — The message payload from the model (typically a dictionary)
Example:
@reactor.on_message
def handle_message(message):
print("Received:", message)
if message.get("type") == "state_update":
process_state(message["data"])
elif message.get("type") == "generation_complete":
print("Generation finished")
@reactor.on_status
Registers a handler for connection status changes. Supports three usage patterns.
Handle all status changes
@reactor.on_status
def handle_status(status: ReactorStatus) -> None:
...
Example:
from reactor_sdk import ReactorStatus
@reactor.on_status
def handle_status(status):
match status:
case ReactorStatus.CONNECTING:
print("Connecting...")
case ReactorStatus.WAITING:
print("Waiting for GPU...")
case ReactorStatus.READY:
print("Ready!")
case ReactorStatus.DISCONNECTED:
print("Disconnected")
Filter by a specific status
@reactor.on_status(ReactorStatus.READY)
def handle_ready(status: ReactorStatus) -> None:
...
Example:
@reactor.on_status(ReactorStatus.READY)
def on_ready(status):
print("Connected! Can now send commands.")
Filter by multiple statuses
@reactor.on_status([ReactorStatus.CONNECTING, ReactorStatus.WAITING])
def handle_pending(status: ReactorStatus) -> None:
...
Example:
@reactor.on_status([ReactorStatus.CONNECTING, ReactorStatus.WAITING])
def on_pending(status):
print(f"Connection in progress: {status}")
@reactor.on_error
Registers a handler for errors.
@reactor.on_error
def handle_error(error: ReactorError) -> None:
...
Handler receives:
error — A ReactorError dataclass with fields: code, message, timestamp, recoverable, component, and optional retry_after.
Example:
from reactor_sdk import ReactorError
@reactor.on_error
def handle_error(error: ReactorError):
print(f"[{error.component}:{error.code}] {error.message}")
if error.recoverable:
delay = error.retry_after or 3
print(f"Recoverable — retry suggested in {delay}s")
else:
print("Non-recoverable error")
@reactor.on_stream
Registers a handler for video stream/track changes.
@reactor.on_stream
def handle_stream(track: MediaStreamTrack) -> None:
...
Handler receives:
track — An aiortc.MediaStreamTrack object representing the video stream from the model
Example:
@reactor.on_stream
def handle_stream(track):
print(f"New video track: {track.kind}")
For most use cases, @reactor.on_frame is more convenient since it gives you decoded NumPy frames directly. Use @reactor.on_stream when you need lower-level access to the raw MediaStreamTrack.
Complete Example
import asyncio
from reactor_sdk import Reactor, ReactorStatus, ReactorError
async def main():
reactor = Reactor(model_name="livecore", api_key="rk_your_key")
@reactor.on_frame
def on_frame(frame):
print(f"Frame: {frame.shape}")
@reactor.on_status(ReactorStatus.READY)
def on_ready(status):
print("Connected!")
@reactor.on_message
def on_message(message):
print(f"Message: {message}")
@reactor.on_error
def on_error(error: ReactorError):
print(f"Error: {error}")
await reactor.connect()
try:
while reactor.get_status() != ReactorStatus.DISCONNECTED:
await asyncio.sleep(0.1)
finally:
await reactor.disconnect()
asyncio.run(main())
Next Steps