Skip to content

Hybrid Perception Architecture: YOLO + VLM Integration

Executive Summary

Problem: Pure YOLO is fast but limited to fixed classes. Pure VLM is flexible but too slow for real-time navigation.

Solution: Hybrid architecture that combines YOLO's speed (30 FPS) with VLM's semantic reasoning (0.2-1 FPS) using RxPY streaming operators.

Three Patterns: 1. Parallel Streams: YOLO and VLM run independently, merge results 2. Sequential Pipeline: YOLO filters candidates, VLM verifies semantics 3. Adaptive Switching: Change strategy based on mission phase

Impact: Enables nuanced queries ("Find the RED ball") while maintaining real-time tracking performance.


Perception System Overview

DIMOS ObjectDetectionStream Architecture

Pipeline:

Video Stream (30 FPS)
    ↓
Detector (YOLO/Detic/VLM) → Bounding boxes
    ↓
Depth Estimation (Metric3D) → 3D position
    ↓
Frame Transform (base_link → odom) → Navigation coordinates
    ↓
Local Planner → Goal setting

Key Code (object_detection_stream.py):

class ObjectDetectionStream:
    def __init__(
        self,
        camera_intrinsics,          # [fx, fy, cx, cy]
        detector,                   # YOLO | Detic | VLM (pluggable!)
        transform_to_map,           # Frame transformation function
        video_stream,               # Observable[frame]
    ):
        self.detector = detector
        self.depth_model = Metric3D()

    def create_stream(self, video_stream) -> Observable:
        """Create detection stream from video."""
        return video_stream.pipe(
            ops.map(self._process_frame)  # Detect + depth + transform
        )

    def _process_frame(self, frame):
        # 1. Detect objects
        bboxes, confidences, labels = self.detector.process_image(frame)

        # 2. Estimate depth
        depths = [calculate_depth(frame, bbox) for bbox in bboxes]

        # 3. Calculate 3D positions (in base_link)
        positions = [calculate_position(bbox, depth) for bbox, depth in zip(bboxes, depths)]

        # 4. Transform to odom frame
        positions_odom = [self.transform_to_map(pos, "base_link") for pos in positions]

        return {"objects": list(zip(labels, positions_odom, confidences))}

Detector Interface (pluggable):

class Detector(ABC):
    @abstractmethod
    def process_image(self, frame: np.ndarray) -> Tuple[List, List, List]:
        """
        Returns:
            bboxes: List of [x1, y1, x2, y2]
            confidences: List of float (0-1)
            labels: List of str
        """
        pass

Pattern 1: Parallel Streams

Architecture

Use Case: Run YOLO and VLM independently, merge results

When to Use: - Need both fast tracking (YOLO) and semantic understanding (VLM) - Different update rates appropriate (YOLO 10 Hz, VLM 0.2 Hz) - Want redundancy (if YOLO misses, VLM might catch)

Data Flow:

Video Stream (30 FPS)
    ├─→ YOLO Stream (10 FPS sampling) → "ball" detections
    └─→ VLM Stream (0.2 FPS sampling) → "red ball" detections
              ↓
        rx.merge() → Combined stream
              ↓
        Navigation (use first valid detection)

Implementation

import reactivex as rx
from reactivex import operators as ops

class ParallelPerceptionStream:
    """Runs YOLO and VLM in parallel at different rates."""

    def __init__(self, robot):
        self.robot = robot
        self.video_stream = robot.get_ros_video_stream()

        # Initialize detectors
        self.yolo_detector = Yolo2DDetector()
        self.vlm_detector = VLMDetector(model="qwen-vl")

        # Initialize depth model (shared)
        self.depth_model = Metric3D()

    def create_yolo_stream(self) -> Observable:
        """High-frequency YOLO detection stream."""
        return ObjectDetectionStream(
            detector=self.yolo_detector,
            camera_intrinsics=self.robot.camera_intrinsics,
            transform_to_map=self.robot.ros_control.transform_pose,
            video_stream=self.video_stream
        ).get_stream().pipe(
            ops.sample(0.1),  # 10 FPS (throttle from 30 FPS)
            ops.map(lambda det: {**det, "source": "yolo"})
        )

    def create_vlm_stream(self, query: str) -> Observable:
        """Low-frequency VLM detection stream."""
        return self.video_stream.pipe(
            ops.sample(5.0),  # Every 5 seconds (VLM is slow)
            ops.map(lambda frame: self._vlm_detect(frame, query)),
            ops.filter(lambda det: det is not None),  # Only valid detections
            ops.map(lambda det: {**det, "source": "vlm"})
        )

    def _vlm_detect(self, frame, query: str):
        """Query VLM for object detection."""
        # Query VLM (e.g., "Find the red ball in this image")
        bbox, confidence = self.vlm_detector.detect(frame, query)

        if bbox is None:
            return None

        # Calculate 3D position
        depth = calculate_depth_from_bbox(self.depth_model, frame, bbox)
        position, rotation = calculate_position_rotation_from_bbox(
            bbox, depth, self.robot.camera_intrinsics
        )

        # Transform to odom
        position_odom, _ = self.robot.ros_control.transform_pose(
            position, rotation, source_frame="base_link"
        )

        return {
            "label": query,
            "bbox": bbox,
            "position": position_odom,
            "confidence": confidence,
            "frame": frame
        }

    def create_merged_stream(self, query: str) -> Observable:
        """Merge YOLO and VLM streams."""
        yolo_stream = self.create_yolo_stream()
        vlm_stream = self.create_vlm_stream(query)

        # Merge streams (interleaved)
        return rx.merge(yolo_stream, vlm_stream).pipe(
            ops.filter(lambda det: self._is_relevant(det, query))
        )

    def _is_relevant(self, detection, query: str):
        """Filter detections based on query."""
        # YOLO: Check if label matches base class
        if detection["source"] == "yolo":
            base_class = self._extract_base_class(query)  # "red ball" → "ball"
            return any(obj["label"] == base_class for obj in detection.get("objects", []))

        # VLM: Already filtered by query
        return True

# Usage
perception = ParallelPerceptionStream(robot)
stream = perception.create_merged_stream(query="red ball")

# Navigate to first detection
stream.pipe(ops.take(1)).subscribe(
    on_next=lambda det: navigate_to_detection(det)
)

Pros and Cons

Pros: - ✅ Real-time tracking (YOLO maintains lock) - ✅ Semantic verification (VLM adds confidence) - ✅ Redundancy (multiple detection sources) - ✅ Independent tuning (adjust rates separately)

Cons: - ⚠️ Conflicting detections (need merge logic) - ⚠️ Duplicate processing (both run on same frames) - ⚠️ Complex synchronization

Best For: High-reliability missions where redundancy is valuable


Architecture

Use Case: YOLO finds candidates, VLM verifies semantics

When to Use: - Nuanced queries ("red ball" not just "ball") - Want efficiency (VLM only on candidates) - Prefer accuracy over latency

Data Flow:

Video Stream (30 FPS)
    ↓
YOLO Detector (10 FPS) → All "ball" detections
    ↓
Filter (only balls with confidence > 0.6)
    ↓
Sample (every 2 seconds) → Throttle VLM load
    ↓
VLM Verifier → "Is this ball red?"
    ↓
Filter (only confirmed red balls)
    ↓
Navigation

Implementation

class SequentialPerceptionStream:
    """YOLO finds candidates, VLM verifies semantics."""

    def __init__(self, robot):
        self.robot = robot
        self.video_stream = robot.get_ros_video_stream()
        self.yolo_detector = Yolo2DDetector()
        self.vlm_verifier = VLMVerifier()

    def create_stream(self, base_class: str, semantic_filter: str) -> Observable:
        """
        Args:
            base_class: YOLO class to detect (e.g., "ball")
            semantic_filter: VLM verification query (e.g., "Is this ball red?")
        """
        # Stage 1: YOLO candidate detection
        yolo_stream = ObjectDetectionStream(
            detector=self.yolo_detector,
            camera_intrinsics=self.robot.camera_intrinsics,
            transform_to_map=self.robot.ros_control.transform_pose,
            video_stream=self.video_stream
        ).get_stream()

        # Stage 2: Extract candidates
        candidate_stream = yolo_stream.pipe(
            # Flatten objects array
            ops.flat_map(lambda det: rx.from_(det.get("objects", []))),

            # Filter by class
            ops.filter(lambda obj: obj["label"] == base_class),

            # Keep frame reference for VLM
            ops.map(lambda obj: {**obj, "needs_verification": True})
        )

        # Stage 3: VLM verification (throttled)
        verified_stream = candidate_stream.pipe(
            ops.sample(2.0),  # Every 2 seconds (throttle VLM)
            ops.map(lambda obj: self._verify_with_vlm(obj, semantic_filter)),
            ops.filter(lambda obj: obj["verified"])
        )

        return verified_stream

    def _verify_with_vlm(self, detection, query: str):
        """Verify detection with VLM."""
        frame = detection["frame"]
        bbox = detection["bbox"]

        # Crop to object
        x1, y1, x2, y2 = map(int, bbox)
        roi = frame[y1:y2, x1:x2]

        # Query VLM
        response = self.vlm_verifier.query(roi, query)

        # Parse response (e.g., "Yes, this is a red ball" → True)
        verified = self._parse_vlm_response(response, query)

        return {
            **detection,
            "verified": verified,
            "vlm_response": response
        }

    def _parse_vlm_response(self, response: str, query: str) -> bool:
        """Parse VLM natural language response."""
        # Simple keyword matching
        positive_keywords = ["yes", "correct", "true", "is"]
        response_lower = response.lower()

        return any(keyword in response_lower for keyword in positive_keywords)

# Usage Example 1: Find red ball
perception = SequentialPerceptionStream(robot)
stream = perception.create_stream(
    base_class="ball",              # YOLO finds all balls
    semantic_filter="Is this ball red?"  # VLM verifies color
)

stream.subscribe(
    on_next=lambda det: navigate_to_detection(det)
)

# Usage Example 2: Find person in blue shirt
stream = perception.create_stream(
    base_class="person",
    semantic_filter="Is this person wearing a blue shirt?"
)

Advanced: Multi-Stage Verification

class MultiStagePerceptionStream(SequentialPerceptionStream):
    """Multiple VLM verification stages."""

    def create_stream(self, filters: List[Dict]) -> Observable:
        """
        Args:
            filters: List of {"yolo_class": str, "vlm_queries": List[str]}
        """
        # Stage 1: YOLO detection
        yolo_stream = self._create_yolo_stream()

        # Stage 2: First VLM filter
        first_filter = yolo_stream.pipe(
            ops.filter(lambda obj: obj["label"] == filters[0]["yolo_class"]),
            ops.sample(2.0),
            ops.map(lambda obj: self._verify(obj, filters[0]["vlm_queries"][0])),
            ops.filter(lambda obj: obj["verified"])
        )

        # Stage 3: Second VLM filter (more specific)
        second_filter = first_filter.pipe(
            ops.sample(5.0),  # Less frequent (more expensive query)
            ops.map(lambda obj: self._verify(obj, filters[0]["vlm_queries"][1])),
            ops.filter(lambda obj: obj["verified"])
        )

        return second_filter

# Usage: Find specific person
stream = MultiStagePerceptionStream(robot).create_stream([
    {
        "yolo_class": "person",
        "vlm_queries": [
            "Is this person wearing blue?",  # Fast check
            "Is this person wearing a blue shirt and black pants?"  # Detailed check
        ]
    }
])

Pros and Cons

Pros: - ✅ Efficient (VLM only on candidates) - ✅ Accurate (semantic verification) - ✅ Flexible (chain multiple filters) - ✅ Predictable latency (throttled VLM)

Cons: - ⚠️ Higher latency (multi-stage pipeline) - ⚠️ Can miss fast-moving objects (throttling) - ⚠️ YOLO must detect base class first

Best For: Nuanced queries where accuracy > speed


Pattern 3: Adaptive Switching

Architecture

Use Case: Change perception strategy based on mission phase

When to Use: - Mission has distinct phases (search → approach → verify) - Want to optimize for each phase - Complexity acceptable for performance gain

Phase Transitions:

Phase 1: SEARCH (VLM only, 0.3 Hz)
  "Where is the red ball?" → Find it
  ↓
Phase 2: APPROACH (YOLO only, 10 Hz)
  "Track that ball" → Navigate toward it
  ↓
Phase 3: VERIFY (VLM once)
  "Is this really a red ball?" → Confirm before grasping

Implementation

from enum import Enum
from typing import Callable

class MissionPhase(Enum):
    SEARCH = "search"      # VLM: Find object anywhere
    APPROACH = "approach"  # YOLO: Track object while navigating
    VERIFY = "verify"      # VLM: Final confirmation

class AdaptivePerceptionStream:
    """Switches perception strategy based on mission phase."""

    def __init__(self, robot):
        self.robot = robot
        self.video_stream = robot.get_ros_video_stream()
        self.current_phase = MissionPhase.SEARCH

        # Detectors
        self.yolo = Yolo2DDetector()
        self.vlm = VLMDetector()

        # Phase-specific streams
        self._search_stream = None
        self._approach_stream = None
        self._verify_stream = None

    def create_stream(self, mission_config: Dict) -> Observable:
        """
        Args:
            mission_config: {
                "search_query": "Find a red ball",
                "yolo_class": "ball",
                "verify_query": "Is this ball red and smooth?"
            }
        """
        # Create phase-specific streams
        self._search_stream = self._create_search_stream(
            mission_config["search_query"]
        )
        self._approach_stream = self._create_approach_stream(
            mission_config["yolo_class"]
        )
        self._verify_stream = self._create_verify_stream(
            mission_config["verify_query"]
        )

        # Merge streams with phase gating
        return rx.merge(
            self._search_stream.pipe(
                ops.filter(lambda _: self.current_phase == MissionPhase.SEARCH)
            ),
            self._approach_stream.pipe(
                ops.filter(lambda _: self.current_phase == MissionPhase.APPROACH)
            ),
            self._verify_stream.pipe(
                ops.filter(lambda _: self.current_phase == MissionPhase.VERIFY)
            )
        )

    def _create_search_stream(self, query: str) -> Observable:
        """VLM-based search (slow but thorough)."""
        return self.video_stream.pipe(
            ops.sample(3.0),  # Every 3 seconds
            ops.map(lambda frame: self._vlm_detect(frame, query)),
            ops.filter(lambda det: det is not None),
            ops.do_action(lambda det: self._on_object_found(det))
        )

    def _create_approach_stream(self, yolo_class: str) -> Observable:
        """YOLO-based tracking (fast)."""
        yolo_stream = ObjectDetectionStream(
            detector=self.yolo,
            camera_intrinsics=self.robot.camera_intrinsics,
            transform_to_map=self.robot.ros_control.transform_pose,
            video_stream=self.video_stream
        ).get_stream()

        return yolo_stream.pipe(
            ops.sample(0.1),  # 10 FPS
            ops.flat_map(lambda det: rx.from_(det.get("objects", []))),
            ops.filter(lambda obj: obj["label"] == yolo_class),
            ops.do_action(lambda obj: self._check_approach_complete(obj))
        )

    def _create_verify_stream(self, query: str) -> Observable:
        """VLM verification (once)."""
        return self.video_stream.pipe(
            ops.take(1),  # Only once
            ops.map(lambda frame: self._vlm_verify(frame, query)),
            ops.do_action(lambda result: self._on_verification_complete(result))
        )

    def _on_object_found(self, detection):
        """Callback when object found in search phase."""
        logger.info(f"Object found: {detection['label']}")
        self.switch_phase(MissionPhase.APPROACH)

    def _check_approach_complete(self, detection):
        """Check if close enough to object."""
        distance = detection["position"]["distance"]
        if distance < 1.0:  # Within 1 meter
            logger.info("Approach complete, verifying...")
            self.switch_phase(MissionPhase.VERIFY)

    def _on_verification_complete(self, result):
        """Callback when verification done."""
        if result["verified"]:
            logger.info("Object verified! Mission complete.")
        else:
            logger.warning("Verification failed, re-searching...")
            self.switch_phase(MissionPhase.SEARCH)

    def switch_phase(self, new_phase: MissionPhase):
        """Change mission phase."""
        logger.info(f"Phase transition: {self.current_phase.value} → {new_phase.value}")
        self.current_phase = new_phase

# Usage
perception = AdaptivePerceptionStream(robot)
stream = perception.create_stream({
    "search_query": "Find a red ball",
    "yolo_class": "ball",
    "verify_query": "Is this a smooth red ball?"
})

# Subscribe to stream
stream.subscribe(
    on_next=lambda det: handle_detection(det, perception.current_phase)
)

def handle_detection(detection, phase):
    if phase == MissionPhase.SEARCH:
        # VLM found object, start navigation
        navigate_to_detection(detection)

    elif phase == MissionPhase.APPROACH:
        # YOLO tracking, update goal continuously
        update_navigation_goal(detection)

    elif phase == MissionPhase.VERIFY:
        # VLM verification, stop and confirm
        if detection["verified"]:
            mission_success()
        else:
            restart_search()

Phase Transition Logic

class MissionStateMachine:
    """State machine for adaptive perception."""

    def __init__(self):
        self.phase = MissionPhase.SEARCH
        self.detection_history = []

    def transition(self, event: str, data: Dict):
        """Handle phase transitions."""
        if self.phase == MissionPhase.SEARCH:
            if event == "object_detected":
                # VLM found something
                self.phase = MissionPhase.APPROACH
                return "start_navigation"

        elif self.phase == MissionPhase.APPROACH:
            if event == "distance_threshold":
                # Close enough to object
                if data["distance"] < 1.5:
                    self.phase = MissionPhase.VERIFY
                    return "request_verification"
            elif event == "object_lost":
                # Lost tracking
                self.phase = MissionPhase.SEARCH
                return "restart_search"

        elif self.phase == MissionPhase.VERIFY:
            if event == "verification_complete":
                if data["verified"]:
                    return "mission_complete"
                else:
                    self.phase = MissionPhase.SEARCH
                    return "restart_search"

Pros and Cons

Pros: - ✅ Optimized for each phase - ✅ Efficient (right tool for right job) - ✅ Robust (handles failures gracefully) - ✅ Extensible (add more phases)

Cons: - ⚠️ Complex state management - ⚠️ Harder to debug - ⚠️ Phase transition logic required - ⚠️ Potential for state confusion

Best For: Complex missions with distinct phases


VLM Integration Details

VLM Detector Interface

class VLMDetector:
    """Vision-Language Model detector (Qwen, GPT-4V, LLaVA, etc.)."""

    def __init__(self, model: str = "qwen-vl", api_key: str = None):
        self.model = model
        self.api_key = api_key

    def detect(self, frame: np.ndarray, query: str) -> Tuple[List[float], float]:
        """
        Detect object based on natural language query.

        Args:
            frame: Image (H, W, 3) RGB
            query: Natural language query (e.g., "Find the red ball")

        Returns:
            bbox: [x1, y1, x2, y2] or None
            confidence: 0-1 score
        """
        # Convert frame to base64
        image_b64 = self._encode_image(frame)

        # Query VLM
        response = self._query_vlm(image_b64, query)

        # Parse response for bbox
        bbox = self._parse_bbox_from_response(response)
        confidence = self._estimate_confidence(response)

        return bbox, confidence

    def verify(self, frame: np.ndarray, query: str) -> Dict:
        """
        Verify object properties with yes/no question.

        Args:
            query: Yes/no question (e.g., "Is this ball red?")

        Returns:
            {"answer": bool, "confidence": float, "explanation": str}
        """
        image_b64 = self._encode_image(frame)
        response = self._query_vlm(image_b64, query)

        return {
            "answer": self._parse_yes_no(response),
            "confidence": self._estimate_confidence(response),
            "explanation": response
        }

    def _query_vlm(self, image_b64: str, query: str) -> str:
        """Query VLM API."""
        if self.model == "qwen-vl":
            return self._query_qwen(image_b64, query)
        elif self.model == "gpt-4v":
            return self._query_openai(image_b64, query)
        else:
            raise ValueError(f"Unknown VLM model: {self.model}")

    def _query_qwen(self, image_b64: str, query: str) -> str:
        """Query Alibaba Qwen VL."""
        # Use DIMOS existing implementation
        from dimos.models.qwen.video_query import get_bbox_from_qwen_frame
        # ... implementation

    def _parse_bbox_from_response(self, response: str) -> Optional[List[float]]:
        """Parse bbox from VLM response."""
        # VLMs may return bbox in various formats:
        # - "The object is at [100, 200, 300, 400]"
        # - "Box: x1=100, y1=200, x2=300, y2=400"
        # - JSON: {"bbox": [100, 200, 300, 400]}

        # Try regex patterns
        import re
        patterns = [
            r'\[(\d+),\s*(\d+),\s*(\d+),\s*(\d+)\]',  # [x1, y1, x2, y2]
            r'x1=(\d+).*y1=(\d+).*x2=(\d+).*y2=(\d+)',  # x1=... y1=... x2=... y2=...
        ]

        for pattern in patterns:
            match = re.search(pattern, response)
            if match:
                return [int(match.group(i)) for i in range(1, 5)]

        return None

DIMOS Qwen Integration (Already Exists!)

Existing Code (NavigateWithText skill, lines 110-120):

from dimos.models.qwen.video_query import get_bbox_from_qwen_frame

# Query Qwen for object
frame = robot.get_ros_video_stream().pipe(ops.take(1)).run()
bbox, object_size = get_bbox_from_qwen_frame(frame, object_name="red ball")

if bbox is not None:
    # Start tracking
    robot.object_tracker.track(bbox, frame=frame)

Can reuse this for VLM detector!


Frame Handling in Hybrid System

Coordinate Frame Pipeline

Stage 1: Image Space (YOLO/VLM output)
  bbox: [x1, y1, x2, y2] pixels
  ↓
Stage 2: Camera Frame (base_link)
  Calculate 3D position from bbox + depth
  position: (x, y, z) in base_link (x=forward, y=left, z=up)
  ↓
Stage 3: Odometry Frame (odom)
  Transform using ROS tf2
  position: (x, y, z) in odom (integrated wheel odometry)
  ↓
Stage 4: Local Planner
  Set goal in odom
  VFH navigates while avoiding obstacles

Frame Synchronization

Problem: Video and transforms have different timestamps

Solution: ROS tf2 buffer with time interpolation

class FrameSynchronizer:
    """Synchronize video frames with robot pose transforms."""

    def __init__(self, robot):
        self.robot = robot
        self.tf_buffer_duration = 10.0  # Keep 10 seconds of transforms

    def transform_detection_to_odom(self, detection, frame_timestamp):
        """
        Transform detection from base_link to odom at frame time.

        Args:
            detection: Detection with position in base_link
            frame_timestamp: ROS time when frame was captured

        Returns:
            Detection with position in odom
        """
        try:
            # Get transform at frame time (not current time!)
            position_odom, rotation_odom = self.robot.ros_control.transform_pose(
                detection["position"],
                detection["rotation"],
                source_frame="base_link",
                target_frame="odom",
                time=frame_timestamp  # Use frame time!
            )

            return {
                **detection,
                "position": position_odom,
                "rotation": rotation_odom,
                "frame": "odom"
            }
        except Exception as e:
            logger.warning(f"Transform failed: {e}")
            return None

Latency Budget

Component Latency Notes
Camera capture 33ms 30 FPS
YOLO inference 30ms GPU (RTX 4070)
Depth estimation 50ms Metric3D
Frame transform 5ms tf2 lookup
Local planner 50ms VFH computation
Total (YOLO) ~170ms Acceptable for 10 Hz
VLM inference 2-5s API call
Total (VLM) ~2-5s Why we throttle

RxPY Streaming Patterns

Useful Operators

# Sampling
stream.pipe(ops.sample(2.0))  # Every 2 seconds
stream.pipe(ops.throttle_first(1.0))  # First in each 1s window

# Filtering
stream.pipe(ops.filter(lambda x: x["confidence"] > 0.8))
stream.pipe(ops.distinct_until_changed())  # Suppress duplicates

# Transformation
stream.pipe(ops.map(lambda x: transform(x)))
stream.pipe(ops.flat_map(lambda x: rx.from_(x)))  # Flatten lists

# Combining
rx.merge(stream1, stream2)  # Interleave
rx.concat(stream1, stream2)  # Sequential
rx.zip(stream1, stream2)  # Pair items

# Buffering
stream.pipe(ops.buffer_with_time(5.0))  # Collect 5s worth
stream.pipe(ops.take(10))  # First 10 items

Backpressure Handling

Problem: VLM is slow, frames pile up

Solution: Sampling operators (drop frames)

# Bad: Process every frame (overwhelms VLM)
video_stream.pipe(
    ops.map(lambda frame: vlm.detect(frame, "red ball"))  # 30 FPS → 30 VLM calls!
)

# Good: Sample at VLM rate
video_stream.pipe(
    ops.sample(5.0),  # Every 5 seconds (0.2 Hz)
    ops.map(lambda frame: vlm.detect(frame, "red ball"))  # Manageable
)

# Better: Throttle + buffer
video_stream.pipe(
    ops.throttle_first(3.0),  # First frame in each 3s window
    ops.map(lambda frame: vlm.detect(frame, "red ball")),
    ops.buffer_with_count(5),  # Batch 5 results
    ops.map(lambda batch: select_best(batch))  # Pick highest confidence
)

Performance Optimization

YOLO Optimization

# Use smaller model for speed
yolo_detector = Yolo2DDetector(
    model="yolo11n.pt",  # Nano (fastest)
    # model="yolo11s.pt",  # Small
    # model="yolo11m.pt",  # Medium (more accurate)
)

# Reduce resolution
video_stream.pipe(
    ops.map(lambda frame: cv2.resize(frame, (640, 480))),  # Downscale
    ops.map(lambda frame: yolo_detector.process_image(frame))
)

# Use GPU
yolo_detector = Yolo2DDetector(device="cuda")

VLM Optimization

# Local VLM (no API latency)
vlm_detector = VLMDetector(
    model="llava-1.5-7b",  # Run on Thor GPU
    api_key=None  # Local inference
)

# Batch queries (if VLM supports it)
frames_batch = video_stream.pipe(
    ops.buffer_with_time(10.0),  # Collect 10s worth
    ops.map(lambda frames: vlm.batch_detect(frames, "red ball"))
)

# Cache results (avoid re-querying)
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_vlm_query(frame_hash, query):
    return vlm.detect(frame, query)

Testing Strategy

Unit Tests

def test_yolo_detection():
    detector = Yolo2DDetector()
    frame = load_test_image("ball.jpg")
    bboxes, confidences, labels = detector.process_image(frame)
    assert "ball" in labels

def test_vlm_detection():
    detector = VLMDetector(model="mock")  # Mock VLM
    frame = load_test_image("red_ball.jpg")
    bbox, confidence = detector.detect(frame, "red ball")
    assert bbox is not None
    assert confidence > 0.5

def test_frame_transformation():
    detection = {"position": (2.0, 0.0, 0.5), "frame": "base_link"}
    detection_odom = transform_to_odom(detection)
    assert detection_odom["frame"] == "odom"

Integration Tests

def test_sequential_pipeline():
    perception = SequentialPerceptionStream(robot)
    stream = perception.create_stream("ball", "Is this ball red?")

    # Collect detections for 30 seconds
    detections = stream.pipe(
        ops.buffer_with_time(30.0),
        ops.take(1)
    ).run()

    # Verify pipeline worked
    assert len(detections) > 0
    assert all(det["verified"] for det in detections)

def test_adaptive_switching():
    perception = AdaptivePerceptionStream(robot)

    # Start in search phase
    assert perception.current_phase == MissionPhase.SEARCH

    # Trigger object found
    perception._on_object_found(mock_detection)
    assert perception.current_phase == MissionPhase.APPROACH

Deployment Configuration

Config File Pattern

# config/perception.yaml
perception:
  mode: "sequential"  # parallel | sequential | adaptive

  yolo:
    model: "yolo11n.pt"
    confidence_threshold: 0.6
    device: "cuda"
    sample_rate_hz: 10.0

  vlm:
    model: "qwen-vl"
    api_key: "${QWEN_API_KEY}"
    sample_rate_hz: 0.2
    timeout_seconds: 10.0

  sequential:
    base_class: "ball"
    semantic_filter: "Is this ball red?"
    throttle_seconds: 2.0

  adaptive:
    search_query: "Find a red ball"
    approach_class: "ball"
    verify_query: "Is this a smooth red ball?"
    approach_distance_threshold: 1.5

Launch File

# shadowhound_bringup/launch/perception.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration

def generate_launch_description():
    return LaunchDescription([
        DeclareLaunchArgument('perception_mode', default_value='sequential'),
        DeclareLaunchArgument('yolo_model', default_value='yolo11n.pt'),

        Node(
            package='shadowhound_perception',
            executable='hybrid_perception_node',
            name='hybrid_perception',
            parameters=[{
                'mode': LaunchConfiguration('perception_mode'),
                'yolo_model': LaunchConfiguration('yolo_model'),
                'config_file': 'config/perception.yaml'
            }]
        )
    ])

Conclusion

Pattern Selection Guide

Use Case Pattern Reason
"Find any ball" YOLO only Fast, no semantics needed
"Find the RED ball" Sequential Efficient VLM verification
"Find ball OR person" Parallel Multiple object types
Complex mission Adaptive Optimize each phase

For MVP: Sequential Pipeline - Start with YOLO only - Add VLM verification when needed - Simplest to implement and debug - Good performance/accuracy tradeoff

Integration with Semantic Memory

Hybrid perception generates observations that should be stored in semantic memory:

Spatial Memory Integration:

from dimos.perception.spatial_perception import SpatialMemory

# Initialize semantic memory
spatial_memory = SpatialMemory(
    collection_name="shadowhound_observations",
    embedding_model="clip",
    db_path="/data/spatial_memory"
)

# Enhanced perception with memory
class MemoryAwarePerception:
    def detect_with_memory(self, query: str):
        # 1. Check memory first (fast)
        past_obs = self.spatial_memory.query_by_text(query, limit=3)

        if past_obs and self._is_recent(past_obs[0]):
            # Navigate to last known location
            location = past_obs[0]["metadata"]["location"]
            self.logger.info(f"Found {query} in memory at {location}")
            return location

        # 2. Use YOLO for real-time search
        detections = self.yolo_stream.get_latest()

        if detections:
            # 3. VLM verification if needed
            verified = self.vlm_stream.verify(detections, query)

            # 4. Store in memory for future
            if verified:
                self.spatial_memory.add_observation(
                    image=self.camera_frame,
                    location=self.robot.get_pose(),
                    label=query,
                    embedding=self.clip_model.encode(self.camera_frame)
                )

            return verified

        return None

Benefits: - ✅ Query memory before searching (faster) - ✅ Store verified detections (episodic memory) - ✅ CLIP embeddings enable scene similarity - ✅ RAG context for LLM planning

Memory-Guided Search Example:

Mission: "Find the red ball"

1. Query memory: "red ball"
   → Found at (3.2, 1.5) 10 minutes ago

2. Navigate to last location (local planner)
   → Arrive in 10 seconds

3. YOLO scan at location
   → Not found (object moved)

4. Query similar scenes in memory
   → "Bedroom has similar toys/carpet"
   → Location (5.0, 8.0)

5. Explore high-probability locations
   → Navigate to bedroom

6. Find ball, update memory
   → Success! 2x faster than blind search

See persistent_intelligence_mvp.md for complete semantic memory architecture.

Next Steps

  1. ✅ Implement VLMDetector class (wrap DIMOS Qwen)
  2. ✅ Create SequentialPerceptionStream
  3. Integrate SpatialMemory (enable episodic memory)
  4. ✅ Test on Go2 hardware
  5. ⏸️ Add parallel/adaptive if needed

Open Questions

  • [ ] Which VLM to use? (Qwen API vs local LLaVA)
  • [ ] VLM sample rate tuning? (balance latency vs accuracy)
  • [ ] How to handle conflicting detections? (YOLO says ball, VLM says no)
  • [ ] Frame synchronization tolerance? (how old can transforms be?)

References

  • RxPY Documentation: https://rxpy.readthedocs.io/
  • DIMOS ObjectDetectionStream: src/dimos-unitree/dimos/perception/object_detection_stream.py
  • DIMOS Qwen Integration: src/dimos-unitree/dimos/models/qwen/video_query.py
  • Related Docs:
  • local_planning_architecture.md - Navigation system
  • persistent_intelligence_mvp.md - Strategic roadmap with semantic memory
  • local_planning_quickstart.md - Implementation timeline