Monorepo for live camera perception (Ultralytics YOLO + MiDaS monocular depth) exposed over a FastAPI WebSocket, plus a Next.js client that draws detections and optionally runs a Gemini-based “household tidying” agent that proposes human-executable actions and gated text-to-speech.
This document is a technical reference for schemas, in-memory state, concurrency, environment variables, and how each module fits together.
flowchart LR
subgraph browser [Browser / Next.js]
Cam[Camera + canvas capture]
WS[WebSocket client]
HTTP[fetch POST agent/step]
TTS[Web Speech or OpenAI TTS]
end
subgraph backend [Python backend]
Dec[decode_image_bytes]
VP[VisionPipeline YOLO + MiDaS]
PP[postprocess_frame_for_agent]
WSrv[/ws/infer]
Pub[publish_latest LATEST_STATE]
Agent[run_agent Gemini]
Gate[compute_voice_gate]
Step[/v1/agent/step]
end
Cam -->|binary WebP/JPEG + fmt byte| WS
WS --> WSrv
WSrv --> Dec --> VP --> PP
PP --> Pub
Step --> Agent
Pub --> Step
Step --> Gate
HTTP --> Step
Gate --> HTTP
HTTP --> TTS
Separation of concerns
- Vision path is tuned for low latency: decode → YOLO + depth → JSON to the browser; in parallel, CPU postprocess builds WebP + grounded JSON and atomically publishes
LATEST_STATEfor the session. - Agent path is slow (Gemini round-trip). It never blocks the WebSocket loop beyond copying a snapshot; rate limiting and “busy” flags prevent overlapping calls per session.
- TTS is a client concern; the server only returns a
voiceobject and acceptsis_tts_playingso the voice gate can defer non-urgent speech while audio plays.
| Path | Role |
|---|---|
frontend-remop/ |
Next.js app: camera, WebSocket streaming, agent polling, overlay canvas, TTS (app/components/CameraOverlay.tsx, app/lib/agentTts.ts, app/api/tts/route.ts). |
backend/inference_server.py |
FastAPI app: GET /health, WebSocket /ws/infer, POST /v1/agent/step. |
backend/vision_pipeline.py |
Shared YOLO + MiDaS 3.1 (DPT_SwinV2_T_256 from isl-org/MiDaS:v3_1); builds the detections JSON for the wire. |
backend/perception_postprocess.py |
After GPU work: resize frame → WebP for Gemini; run grounding. |
backend/grounding.py |
Map raw detections → compact grounded list for the LLM (cz, sectors, filters). |
backend/agent_attention.py |
When task_anchor is set, reorder/cap grounded objects (safety + focus). |
backend/agent_tools.py |
Pydantic AgentResponse / Action schema, action→API mapping, default TTS phrases from actions. |
backend/agent_gemini.py |
Google GenAI generate_content with response_schema=AgentResponse, system prompt, user text assembly, JSONL logging hook. |
backend/world_state.py |
Per-session LATEST_STATE, turn log, task_anchor, inferred held object, agent throttle/busy, voice-gate memory. |
backend/voice_gate.py |
compute_voice_gate: when to speak, supersede vs queue, phases, co-gating with is_tts_playing. |
backend/executors.py |
Dedicated ThreadPoolExecutor pools: VISION vs AGENT. |
backend/gemini_session_log.py |
Append JSON Lines under backend/logs/ per session. |
backend/yolo26_depth_webcam.py |
Standalone OpenCV demo using the same VisionPipeline. |
- Path:
/ws/infer - Query parameters (read in
inference_server.py):model:oiv7(default) orcoco→ selects YOLO weights preset.session_id: opaque string; default server-side session is"default"if missing or blank. Must matchPOST /v1/agent/step?session_id=soLATEST_STATElines up.
The server accepts raw WebP/JPEG bytes or an optional leading format byte:
| First byte | Meaning |
|---|---|
0x01 |
Remaining bytes are JPEG. |
0x02 |
Remaining bytes are WebP. |
| (absent) | Auto-detect: RIFF+WEBP → WebP; FF D8 → JPEG; else OpenCV imdecode. |
The Next.js client sends [fmt_u8][image_bytes] (CameraOverlay.tsx: FMT_JPEG = 1, FMT_WEBP = 2).
Decode implementation: decode_image_bytes in inference_server.py (TurboJPEG for JPEG when available; Pillow for WebP).
- A process-wide
asyncio.Lock()(_infer_lock) ensures one GPU pipelineinfer()at a time across all WebSocket clients (avoids GPU contention). - Decode and CPU postprocess run in the
VISIONthread pool.
- Receive bytes → decode to BGR
numpy. - Under
_infer_lock:VisionPipeline.infer(frame)→ wire payloadout(see §4). - Without holding
_infer_lock:postprocess_frame_for_agent(frame, out)→(webp_bytes, grounded). await publish_latest(session_id, webp_bytes, out, grounded)— updatesLATEST_STATEbefore replying on the socket (avoids client races where agent POST returns 409).await ws.send_json(out)— same structure as §4.
Produced by _detections_payload in vision_pipeline.py. Sent to the browser as the WebSocket text JSON message.
// Conceptual TypeScript shape (matches frontend InferResponse)
type InferResponse = {
w: number;
h: number;
detections: Detection[];
error?: string; // only on decode/pipeline failure paths
};
type Detection = {
label: string; // class name from YOLO
conf: number; // 0..1
x1: number; y1: number; x2: number; y2: number; // normalized 0..1
cx: number; cy: number; // box center, normalized
rel_depth: number; // MiDaS scalar at integer pixel (u,v) of center
};Depth semantics: MiDaS outputs are used as a relative ordering signal (see YOLO webcam docstring: larger values ≈ closer in this setup). They are not metric depth. The agent prompt describes cz as a rough step-equivalent after scaling (§5).
On failure, the server may send { error, w: 0, h: 0, detections: [] }.
Built by ground_detections_payload in grounding.py from the same out dict the client receives (not from WebP).
Each element:
{
"class": "cup",
"cx": 0.512,
"cy": 0.403,
"cz": 1.234,
"sector": "left" | "center" | "right",
"v_band": "upper" | "middle" | "lower",
"conf": 0.87
}cz=rel_depth / KwhereKisAGENT_DEPTH_SCALE_K(default1.0). There is no per-frame min/max normalization — calibration is your responsibility.sector/v_banduse configurable thresholds:AGENT_SECTOR_*env vars (defaults split at ~1/3 and ~2/3 of normalized x/y).- Filtering: drop detections with
conf < AGENT_DETECTION_MIN_CONF(default0.3), sort by conf descending, keep up toAGENT_TOP_N_DETECTIONS(default20). - Optional allowlist:
AGENT_CLASS_ALLOWLIST— comma-separated lowercase names; if set, only those classes pass.
prioritize_grounded_for_model in agent_attention.py:
- If
task_anchoris empty → returns a copy of full grounded list (no reorder). - Else:
person,cat,dogalways appear first (by conf), regardless of anchor.- Up to
AGENT_ANCHOR_TOP_MATCHdetections whose class matches anchor tokens (substring/token overlap on class string). - Up to
AGENT_ANCHOR_TOP_RESTother detections (by conf), with a twist: ifinferred_held_objectis set, prefer non-held classes first in the “rest” slots so surfaces (table,bin, …) surface before the held blob.
Deduping preserves order on (class, cx, cy, cz).
All session data lives in _sessions: dict[str, _SessionBrainState] under a single asyncio.Lock() (_lock). There is no disk persistence of session brain state (except Gemini logs).
| Field | Type | Meaning |
|---|---|---|
frame_webp |
bytes |
Resized WebP of the full-resolution frame (for Gemini multimodal input). |
detections |
dict |
Copy of the wire payload (w, h, detections[]) at publish time. |
grounded |
list[dict] |
Grounded objects list (§5) at publish time. |
version |
int |
Monotonic per session counter incremented on each successful publish. |
| Field | Purpose |
|---|---|
latest |
Current LatestSnapshot or None until first successful infer. |
recent_action_labels |
Rolling labels (formatted name or name({...json})), last 5 by default. |
last_agent_ms |
Wall for rate limit after successful agent completion. |
agent_busy |
True while a Gemini call is in flight for this session. |
task_anchor |
Session subgoal string; merged from model each successful step (§7). |
turn_log |
List of dicts: thought, instruction, actions (labels), anchor (post-merge). Trimmed to AGENT_MEMORY_TURNS newest entries. |
last_step_monotonic |
Used to compute seconds_since_last_step for the prompt. |
last_issued_action_labels |
Exact labels from the previous successful step. |
voice_last_speak_text |
Last committed TTS line (for echo / dwell dedupe). |
voice_last_speak_monotonic |
When that line was committed. |
voice_last_motor_fingerprint |
Canonical fingerprint of non-wait actions from last motor commit (§9). |
inferred_held_object |
Server-side inference from pick_up / place / drop (§6.3). |
apply_inferred_held_after_actions walks this tick’s actions in order:
placeordrop→ clears held.pick_up→ ifargs.targetis a non-empty string, sets held to that string (truncated toAGENT_HELD_OBJECT_MAX_LEN).
This is not vision-based truth; the prompt tells the model to verify visually.
merge_task_anchor_from_model (world_state.py):
- Model output
""→ keep current anchor. CLEAR(case-insensitive; outer quotes stripped) → clear anchor.- Otherwise → sanitize (alphanumerics + ASCII spaces only, collapse whitespace); if empty after sanitize, keep current; else replace, truncated to
AGENT_TASK_ANCHOR_MAX_LEN.
get_memory_for_prompt builds:
turn_log_excerpt: lastAGENT_MEMORY_TURNSturns, each as one JSON object per line (thought,instruction,actions,anchor_after), total chars capped byAGENT_MEMORY_MAX_CHARS(tail-truncated if needed).seconds_since_last_step:time.monotonic() - last_step_monotonicif a prior step exists.
try_begin_agent_work:
- Rejects if
agent_busy→ HTTP 429 reasonbusy. - Rejects if
now_ms - last_agent_ms < AGENT_MIN_INTERVAL_MS→ 429 reasonthrottle. - Else sets
agent_busy = True.
finish_agent_work: clears busy; on success updates last_agent_ms.
POST /v1/agent/step (inference_server.py):
copy_snapshot(session_id)— ifNone→ 409 (“No perception state… send frames first”).try_begin_agent_work— may 429.- Missing
GEMINI_API_KEY→ 503, busy cleared. get_memory_for_prompt,prioritize_grounded_for_modelonsnap.grounded.run_agent(snap.frame_webp, grounded_for_model, …)inAGENTthread pool (executors.py).- On success:
update_memory_after_agent_success,compute_voice_gate,commit_voice_gate_result,finish_agent_work(success=True).
Query params: goal, last_outcome (optional strings, forwarded into the user message).
Body (JSON, optional): { "is_tts_playing": boolean } — forwarded to compute_voice_gate.
| Env | Default | Role |
|---|---|---|
GEMINI_API_KEY |
— | Required for agent. |
GEMINI_AGENT_MODEL |
gemini-2.5-flash-lite |
Model id. |
AGENT_MAX_OUTPUT_TOKENS |
256 |
Clamped 32..512. |
HIGH_LEVEL_INTENTION_PROMPT |
(built-in tidying text) | Overrides mission paragraph. |
The system instruction is assembled in _system_instruction() in agent_gemini.py: mission, role, anchor/holding rules, ACTION_REGISTRY_PROMPT, and JSON-only output requirement.
Includes, in order (when applicable):
Grounded objects (JSON array):— JSON dump of attention-filtered grounded list.Stored task_anchorSession tool-inference— inferred held object line.Recent turn log— excerpt.Recent action labels+Last step action labels- Motion cooldown hint — if within
AGENT_MOTION_COOLDOWN_HINT_SECof last step and last labels included a motion action (move_*,turn_*), nudge towardwait/ not repeating motion without evidence. Current goal,Last outcome / human feedback- Closing instruction to respond with JSON.
Image side: types.Part.from_bytes(..., mime_type="image/webp").
Defined in agent_tools.py (Pydantic, used as response_schema):
| Field | Type | Notes |
|---|---|---|
thought |
str |
Dashboard / operator only; not read aloud by design. |
instruction |
str |
Preferred TTS line when non-empty; constrained in prompt (second-person imperatives; banned words). |
actions |
list[Action] |
Each action: name + args_json string containing a JSON object. |
task_anchor |
str |
Tri-state: "" keep, CLEAR clear, else new anchor (§6.4). |
Legacy: if the model returns say instead of thought, a validator maps say → thought.
ActionName: move_forward, move_backward, turn_left, turn_right, pick_up, drop, place, look_around, wait.
args_json examples (from prompt/registry):
move_forward/move_backward:{"steps": 1}turn_left/turn_right:{"degrees": 30}pick_up:{"target": "<class string>"}— must match groundedclassplace:{"target": "...", "near": "<class string>"}—nearmust be grounded classdrop:{}look_around:{}wait:{}or{"seconds": 2}
Success JSON (inference_server.py):
| Key | Meaning |
|---|---|
say |
Dashboard line: thought if non-empty else spoken_line fallback. |
instruction |
Raw model instruction. |
spoken_line |
Trimmed model instruction only (empty string means no speech this tick). |
actions |
[{ "name", "args" }] via action_to_api_dict (parsed args_json). |
state_version |
snap.version at time of read (perception snapshot used). |
task_anchor |
Stored anchor after merge. |
inferred_held_object |
After applying this tick’s actions. |
voice |
{ speak, should_speak, phase, supersede } (§9). |
HTTP errors:
- 409 — no
LATEST_STATEfor session. - 429 — busy or throttle.
- 503 — no API key.
- 502 — Gemini failure (
Gemini error: …).
append_gemini_log(session_id, entry) writes JSON Lines (.jsonl). Filename pattern: gemini_{sanitized_session}_{utc_timestamp}.jsonl.
Each line may include: UTC timestamp, model id, full user_text, frame_webp_bytes length, grounded snapshot, memory fields, gemini_response metadata, parsed agent_response, or error.
compute_voice_gate decides should_speak, supersede, and phase using anchor transitions, motor fingerprint, dwell, min interval, and is_tts_playing.
motor_fingerprint_for_actions: for every action except wait, canonicalize args_json (sorted keys JSON) and build sorted strings name:{...} joined by |. Empty if only waits.
| Condition | Phase |
|---|---|
| No anchor after merge | idle |
| Non-empty fingerprint | motion_pending |
All actions are wait |
waiting_scene |
| Else | anchored |
- Anchor changed (
anchor_before != anchor_after) →should_speak,supersede = True, commit reasonanchor. - Else if fingerprint non-empty and ≠
voice_last_motor_fingerprint→should_speak,supersede = False, commitmotor. - Else if dwell: same anchor,
voice_last_speak_monotonic > 0, and elapsed ≥VOICE_MIN_DWELL_SEC→should_speak(unless normalized TTS line equals last spoken line — then suppressed). is_tts_playingandshould_speakand notsupersede→ forceshould_speak = False(urgent barge-in still allowed via supersede).
VOICE_MIN_INTERVAL_SEC (default 2): if should_speak, not supersede, and last speak was too recent → suppress.
The speak string in the payload is the line intended for TTS when gating allows; when not speaking, the gate may echo voice_last_speak_text for UI stability.
commit_voice_gate_result (in world_state.py) runs only when should_speak; updates voice_last_speak_text, voice_last_speak_monotonic, and on commit_reason == "motor" stores voice_last_motor_fingerprint.
CameraOverlayMount.tsx loads CameraOverlay with next/dynamic { ssr: false } so browser-only APIs stay client-only.
On mount, crypto.randomUUID() (fallback sid-{Date.now()}) — same id appended to /ws/infer and /v1/agent/step.
NEXT_PUBLIC_INFERENCE_WS_URL (default ws://127.0.0.1:8000/ws/infer). Query params added at connect: model, session_id.
requestAnimationFrame drives tick() when streaming:
- Skips when
document.visibilityState === 'hidden'. - Ping-pong with
busyRef: one frame in flight at a time. - Canvas
toBlobWebP at quality0.5if supported, else JPEG0.72. - Prepends
FMT_WEBP/FMT_JPEGbyte.
inferReady: set true after first JSON message with w > 0 (used to avoid agent POST before state exists).
When streaming && sessionId && inferReady:
- HTTP base:
NEXT_PUBLIC_AGENT_HTTP_URLor derived from WebSocket URL (ws→http,wss→https). POST .../v1/agent/step?session_id=every750ms(gapMs), with at most one request in flight (agentStepInFlightRef).- Body:
{ is_tts_playing }fromsubscribeTtsPlaying/getIsTtsPlaying()inagentTts.ts. - 429 is ignored silently (throttle).
- TTS: if
voice.should_speak, usesvoice.speakwithspeakInstruction(line, { supersede }); non-supersede path delays slightly (CLIENT_TTS_MIN_GAP_MS = 250) to stagger from serverVOICE_MIN_INTERVAL_SEC.
unlockAudioFromUserGesture() and resumeAudioContextAfterUserGesture() run on Start camera (same user gesture) for Safari/iOS: silent Web Speech utterance + AudioContext.resume().
| Engine | Behavior |
|---|---|
browser (default) |
speechSynthesis queue; supersede cancels queue and clears defer timer. |
openai |
POST /api/tts → OpenAI tts-1, response_format: mp3; playback via HTMLAudioElement first, Web Audio fallback. |
NEXT_PUBLIC_TTS_ENGINE, NEXT_PUBLIC_OPENAI_TTS_VOICE, server OPENAI_API_KEY for /api/tts. Optional NEXT_PUBLIC_TTS_DEBUG.
See backend/.env.example for the canonical list. Additional / noteworthy:
| Variable | Default | Module |
|---|---|---|
VISION_EXECUTOR_WORKERS |
8 |
executors.py |
AGENT_EXECUTOR_WORKERS |
4 |
executors.py |
VOICE_MIN_INTERVAL_SEC |
2 |
voice_gate.py (not in .env.example) |
YOLO_MODEL |
— | Legacy alias for oiv7 path when YOLO_MODEL_OIV7 unset |
MIDAS_EVERY_N |
1 |
Reuse previous depth map every N frames |
See frontend-remop/.env.example.
You need two processes: Python inference server + Next.js dev server.
cd backend
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install -r requirements.txt
cp .env.example .env # set GEMINI_API_KEY for agent; optional CORS, tuning
python -m uvicorn inference_server:app --host 0.0.0.0 --port 8000Shortcut: ./serve.sh (uses venv python if present).
Health: http://127.0.0.1:8000/health
Troubleshooting ModuleNotFoundError: No module named 'torch': use python -m uvicorn inside the activated venv, not a global uvicorn binary.
PyTurboJPEG (optional JPEG speedup): requires libjpeg-turbo on the system; otherwise OpenCV decode is used.
cd frontend-remop
cp .env.example .env.local # optional: NEXT_PUBLIC_INFERENCE_WS_URL, OPENAI_API_KEY
npm install
npm run devOpen http://localhost:3000. Use Start camera (required for Safari / user-gesture rules).
Use wss:// and HTTPS in production for camera APIs outside localhost; align INFERENCE_CORS_ORIGINS.
cd backend && source .venv/bin/activate && python yolo26_depth_webcam.pyFirst run downloads YOLO and MiDaS weights; timm is pinned for MiDaS v3.1 compatibility.
This project is licensed under the MIT License—see LICENSE.