This guide covers the core concepts, best practices, and patterns for building Python applications with Reactor.
Understanding Reactor
Reactor enables you to build realtime AI applications by connecting your application to GPU-powered machines that stream interactive AI experiences. The Python SDK provides an async interface for connecting to models and processing video frames directly — ideal for scripts, servers, data pipelines, and any non-browser use case.
Connection Lifecycle
A Reactor connection goes through four states:
DISCONNECTED → CONNECTING → WAITING → READY
- DISCONNECTED: No active connection
- CONNECTING: Establishing connection to the Reactor coordinator
- WAITING: Connected to coordinator, waiting for GPU assignment
- READY: Connected to GPU machine, can send and receive messages
Basic Setup
Installation
Creating a Reactor Instance
from reactor_sdk import Reactor
reactor = Reactor(
model_name="livecore",
api_key="rk_your_api_key_here",
)
When you pass api_key, the SDK automatically fetches a JWT token during connect(). No separate token-fetching step is needed.
Connecting and Disconnecting
import asyncio
from reactor_sdk import Reactor
async def main():
reactor = Reactor(model_name="livecore", api_key="rk_your_api_key_here")
await reactor.connect()
print("Connected:", reactor.get_status())
# ... interact with the model ...
await reactor.disconnect()
asyncio.run(main())
Using the Context Manager
For automatic cleanup, use async with:
async def main():
async with Reactor(model_name="livecore", api_key="rk_your_api_key_here") as reactor:
print("Connected:", reactor.get_status())
# Connection is automatically cleaned up when leaving the block
Sending Commands
Once connected (READY state), send commands to the model:
from reactor_sdk import Reactor, ReactorStatus
async def main():
reactor = Reactor(model_name="livecore", api_key=api_key)
await reactor.connect()
if reactor.get_status() == ReactorStatus.READY:
await reactor.send_command("schedule_prompt", {
"new_prompt": "A sunset over the ocean",
"timestamp": 0,
})
await reactor.send_command("start", {})
The send_command method takes two arguments:
command: A string identifying the command type
data: A dictionary containing the command payload
Receiving Messages
There are two ways to handle messages from the model.
Using the Decorator
@reactor.on_message
def handle_message(message):
print("Received:", message)
if message.get("type") == "state_update":
update_state(message["data"])
Using the Event API
def handle_message(message):
print("Received:", message)
reactor.on("new_message", handle_message)
# Later, to unregister:
reactor.off("new_message", handle_message)
Receiving Video Frames
A key feature of the Python SDK is direct access to video frames as NumPy arrays. This is ideal for computer vision pipelines, recording, analysis, and any processing that doesn’t require a browser.
Using the Decorator
import numpy as np
@reactor.on_frame
def handle_frame(frame):
# frame is a numpy array with shape (H, W, 3), dtype uint8, RGB
print(f"Frame: {frame.shape}")
# Example: save with PIL
from PIL import Image
img = Image.fromarray(frame)
img.save("latest_frame.png")
Using set_frame_callback
def process_frame(frame):
print(f"Frame: {frame.shape}")
reactor.set_frame_callback(process_frame)
# To stop receiving frames:
reactor.set_frame_callback(None)
Frames are delivered as NumPy arrays with shape (H, W, 3), dtype uint8, in RGB color order.
Monitoring Connection Status
Using Decorators
from reactor_sdk import ReactorStatus
# Handle all status changes
@reactor.on_status
def handle_status(status):
print(f"Status: {status}")
# Handle a specific status
@reactor.on_status(ReactorStatus.READY)
def handle_ready(status):
print("Connected and ready!")
# Handle multiple statuses
@reactor.on_status([ReactorStatus.CONNECTING, ReactorStatus.WAITING])
def handle_pending(status):
print(f"In progress: {status}")
Using the Event API
def handle_status(status):
print(f"Status changed to: {status}")
reactor.on("status_changed", handle_status)
Checking Status Directly
status = reactor.get_status()
if status == ReactorStatus.READY:
await reactor.send_command("start", {})
Publishing Video Input
For video-to-video models, you can publish a video track to the model:
await reactor.publish_track(video_track)
# Stop publishing when done
await reactor.unpublish_track()
You can also access the remote video track from the model:
remote_track = reactor.get_remote_track()
Error Handling
Using the Decorator
from reactor_sdk import ReactorError
@reactor.on_error
def handle_error(error: ReactorError):
print(f"[{error.component}:{error.code}] {error.message}")
if error.recoverable:
print("Error is recoverable, will attempt reconnect")
Using the Event API
reactor.on("error", lambda error: print(f"Error: {error}"))
Error Recovery
import asyncio
@reactor.on_error
async def handle_error(error):
if error.recoverable:
delay = error.retry_after or 3
print(f"Retrying in {delay}s...")
await asyncio.sleep(delay)
await reactor.reconnect()
else:
print(f"Non-recoverable error: {error.message}")
Checking State
state = reactor.get_state()
print(f"Status: {state.status}")
if state.last_error:
print(f"Last error: {state.last_error}")
session_id = reactor.get_session_id()
if session_id:
print(f"Session: {session_id}")
Reconnecting
If a connection is interrupted, you can reconnect to an existing session:
# Disconnect but keep the session alive
await reactor.disconnect(recoverable=True)
# Later, reconnect to the same session
await reactor.reconnect()
Complete Example
Here’s a full example combining all the pieces:
import asyncio
from reactor_sdk import Reactor, ReactorStatus, ReactorError
async def main():
api_key = "rk_your_api_key_here"
reactor = Reactor(model_name="livecore", api_key=api_key)
# Handle frames
@reactor.on_frame
def on_frame(frame):
print(f"Frame: {frame.shape}")
# Handle connection
@reactor.on_status(ReactorStatus.READY)
def on_ready(status):
print("Connected to GPU!")
@reactor.on_status(ReactorStatus.DISCONNECTED)
def on_disconnected(status):
print("Disconnected")
# Handle messages
@reactor.on_message
def on_message(message):
print(f"Message: {message}")
# Handle errors
@reactor.on_error
def on_error(error: ReactorError):
print(f"Error: {error}")
# Connect and send commands
await reactor.connect()
if reactor.get_status() == ReactorStatus.READY:
await reactor.send_command("schedule_prompt", {
"new_prompt": "A sunset over the ocean",
"timestamp": 0,
})
await reactor.send_command("start", {})
# Keep running until disconnected
try:
while reactor.get_status() != ReactorStatus.DISCONNECTED:
await asyncio.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
await reactor.disconnect()
asyncio.run(main())
Local Development
When running a model locally with the HTTP runtime, use local=True:
reactor = Reactor(model_name="my-model", local=True)
await reactor.connect()
This connects to localhost:8080 without authentication. See the Local Setup guide for details on running models locally.
Next Steps