Skip to main content

Decorators

The Python SDK provides decorators for registering event handlers. Decorators are defined on a Reactor instance and are the idiomatic Python way to handle events.
All decorators are equivalent to calling reactor.on(event, handler). Use whichever style you prefer.

@reactor.on_frame

Registers a callback to receive video frames as NumPy arrays.
@reactor.on_frame
def handle_frame(frame: NDArray[np.uint8]) -> None:
    ...
Handler receives:
  • frame — A NumPy array with shape (H, W, 3), dtype uint8, in RGB color order.
Example:
from reactor_sdk import Reactor

reactor = Reactor(model_name="livecore", api_key=api_key)

@reactor.on_frame
def handle_frame(frame):
    print(f"Frame: {frame.shape}")  # e.g. (720, 1280, 3)
    
    # Save with PIL
    from PIL import Image
    Image.fromarray(frame).save("frame.png")

    # Or use with OpenCV (note: convert RGB → BGR)
    import cv2
    bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
    cv2.imwrite("frame.png", bgr)
This is equivalent to calling reactor.set_frame_callback(handle_frame).

@reactor.on_message

Registers a handler for messages received from the model.
@reactor.on_message
def handle_message(message: Any) -> None:
    ...
Handler receives:
  • message — The message payload from the model (typically a dictionary)
Example:
@reactor.on_message
def handle_message(message):
    print("Received:", message)

    if message.get("type") == "state_update":
        process_state(message["data"])
    elif message.get("type") == "generation_complete":
        print("Generation finished")

@reactor.on_status

Registers a handler for connection status changes. Supports three usage patterns.

Handle all status changes

@reactor.on_status
def handle_status(status: ReactorStatus) -> None:
    ...
Example:
from reactor_sdk import ReactorStatus

@reactor.on_status
def handle_status(status):
    match status:
        case ReactorStatus.CONNECTING:
            print("Connecting...")
        case ReactorStatus.WAITING:
            print("Waiting for GPU...")
        case ReactorStatus.READY:
            print("Ready!")
        case ReactorStatus.DISCONNECTED:
            print("Disconnected")

Filter by a specific status

@reactor.on_status(ReactorStatus.READY)
def handle_ready(status: ReactorStatus) -> None:
    ...
Example:
@reactor.on_status(ReactorStatus.READY)
def on_ready(status):
    print("Connected! Can now send commands.")

Filter by multiple statuses

@reactor.on_status([ReactorStatus.CONNECTING, ReactorStatus.WAITING])
def handle_pending(status: ReactorStatus) -> None:
    ...
Example:
@reactor.on_status([ReactorStatus.CONNECTING, ReactorStatus.WAITING])
def on_pending(status):
    print(f"Connection in progress: {status}")

@reactor.on_error

Registers a handler for errors.
@reactor.on_error
def handle_error(error: ReactorError) -> None:
    ...
Handler receives:
  • error — A ReactorError dataclass with fields: code, message, timestamp, recoverable, component, and optional retry_after.
Example:
from reactor_sdk import ReactorError

@reactor.on_error
def handle_error(error: ReactorError):
    print(f"[{error.component}:{error.code}] {error.message}")

    if error.recoverable:
        delay = error.retry_after or 3
        print(f"Recoverable — retry suggested in {delay}s")
    else:
        print("Non-recoverable error")

@reactor.on_stream

Registers a handler for video stream/track changes.
@reactor.on_stream
def handle_stream(track: MediaStreamTrack) -> None:
    ...
Handler receives:
  • track — An aiortc.MediaStreamTrack object representing the video stream from the model
Example:
@reactor.on_stream
def handle_stream(track):
    print(f"New video track: {track.kind}")
For most use cases, @reactor.on_frame is more convenient since it gives you decoded NumPy frames directly. Use @reactor.on_stream when you need lower-level access to the raw MediaStreamTrack.

Complete Example

import asyncio
from reactor_sdk import Reactor, ReactorStatus, ReactorError

async def main():
    reactor = Reactor(model_name="livecore", api_key="rk_your_key")

    @reactor.on_frame
    def on_frame(frame):
        print(f"Frame: {frame.shape}")

    @reactor.on_status(ReactorStatus.READY)
    def on_ready(status):
        print("Connected!")

    @reactor.on_message
    def on_message(message):
        print(f"Message: {message}")

    @reactor.on_error
    def on_error(error: ReactorError):
        print(f"Error: {error}")

    await reactor.connect()

    try:
        while reactor.get_status() != ReactorStatus.DISCONNECTED:
            await asyncio.sleep(0.1)
    finally:
        await reactor.disconnect()

asyncio.run(main())

Next Steps