Skip to main content

Chapter 15: SLAM, VSLAM, and Navigation

15.1 SLAM Fundamentals

15.1.1 The SLAM Problem

Simultaneous Localization and Mapping (SLAM) is one of the fundamental challenges in robotics. The robot must build a map of an unknown environment while simultaneously estimating its position within that map.

SLAM addresses the chicken-and-egg problem: to build a map, you need to know your position; to know your position, you need a map. The solution lies in the probabilistic framework that estimates both pose and map simultaneously while maintaining uncertainty estimates.

15.1.2 SLAM Mathematical Framework

The SLAM problem can be formulated as a Bayesian estimation problem:

p(x1:t,mz1:t,u1:t)=ηp(ztxt,m)p(xtxt1,ut)p(x1:t1,mz1:t1,u1:t1)dxt1p(x_{1:t}, m | z_{1:t}, u_{1:t}) = \eta \cdot p(z_t | x_t, m) \cdot \int p(x_t | x_{t-1}, u_t) \cdot p(x_{1:t-1}, m | z_{1:t-1}, u_{1:t-1}) dx_{t-1}

Where:

  • x_{1:t} = Robot poses over time
  • m = Map (landmarks or occupancy grid)
  • z_{1:t} = Measurements over time
  • u_{1:t} = Controls over time
class SLAMFramework:
    def __init__(self, initial_pose, map_parameters):
        self.robot_pose = initial_pose
        self.map = Map(map_parameters)
        self.landmarks = []
        self.trajectory = [initial_pose.copy()]
        self.uncertainty = np.eye(3) * 0.1  # Initial pose uncertainty

    def predict(self, control, dt):
        """Predict robot pose using motion model"""
        # Update pose using control input
        self.robot_pose = self.motion_model(self.robot_pose, control, dt)

        # Propagate uncertainty
        self.uncertainty = self.propagate_uncertainty(control, dt)

        return self.robot_pose.copy(), self.uncertainty.copy()

    def update(self, measurements):
        """Update pose and map with new measurements"""
        # Data association - match measurements to landmarks
        associations = self.data_association(measurements)

        for measurement, landmark_id in associations:
            if landmark_id == 'new':
                # Initialize new landmark
                self.initialize_landmark(measurement)
            else:
                # Update existing landmark and robot pose
                self.update_with_known_landmark(measurement, landmark_id)

        # Store trajectory
        self.trajectory.append(self.robot_pose.copy())

    def motion_model(self, pose, control, dt):
        """Differential drive motion model"""
        x, y, theta = pose
        v, omega = control

        # Add control noise
        v_noisy = v + np.random.normal(0, 0.1)
        omega_noisy = omega + np.random.normal(0, 0.1)

        # Update pose
        if abs(omega_noisy) < 1e-6:
            # Straight line motion
            x_new = x + v_noisy * np.cos(theta) * dt
            y_new = y + v_noisy * np.sin(theta) * dt
            theta_new = theta
        else:
            # Circular motion
            R = v_noisy / omega_noisy
            x_new = x + R * (np.sin(theta + omega_noisy * dt) - np.sin(theta))
            y_new = y + R * (-np.cos(theta + omega_noisy * dt) + np.cos(theta))
            theta_new = theta + omega_noisy * dt

        return np.array([x_new, y_new, theta_new])

    def data_association(self, measurements):
        """Associate measurements with known landmarks"""
        associations = []

        for measurement in measurements:
            best_landmark = None
            best_distance = float('inf')

            # Find nearest landmark
            for i, landmark in enumerate(self.landmarks):
                predicted_measurement = self.predict_measurement(landmark)
                distance = np.linalg.norm(measurement - predicted_measurement)

                if distance < best_distance and distance < 2.0:  # Association threshold
                    best_distance = distance
                    best_landmark = i

            if best_landmark is not None:
                associations.append((measurement, best_landmark))
            else:
                associations.append((measurement, 'new'))

        return associations

    def predict_measurement(self, landmark):
        """Predict measurement from landmark position"""
        dx = landmark['position'][0] - self.robot_pose[0]
        dy = landmark['position'][1] - self.robot_pose[1]
        distance = np.sqrt(dx**2 + dy**2)

        # Bearing to landmark
        bearing = np.arctan2(dy, dx) - self.robot_pose[2]

        return np.array([distance, bearing])

15.2 Extended Kalman Filter SLAM

15.2.1 EKF-SLAM Implementation

Extended Kalman Filter SLAM maintains joint state of robot pose and landmark positions:

class EKFSLAM:
    def __init__(self, initial_pose, initial_covariance):
        # State: [x, y, theta, l1_x, l1_y, l2_x, l2_y, ...]
        self.state = np.zeros(3)
        self.state[:3] = initial_pose

        # Covariance matrix
        self.covariance = np.eye(3) * 0.1
        self.initial_covariance = initial_covariance

        # Process and measurement noise
        self.Q = np.diag([0.1, 0.1, 0.01])  # Process noise
        self.R = np.diag([0.5, 0.1])  # Measurement noise (range, bearing)

        # Landmarks
        self.landmarks = []
        self.next_landmark_id = 0

        # Data association parameters
        self.gating_threshold = 3.0  # Mahalanobis distance threshold

    def predict(self, control, dt):
        """Predict robot pose and covariance"""
        # Extract robot pose
        pose = self.state[:3]

        # Predict new pose
        new_pose = self.motion_model(pose, control, dt)
        self.state[:3] = new_pose

        # Jacobian of motion model
        F = self.motion_jacobian(pose, control, dt)

        # Augment Jacobian for landmarks (identity)
        n_landmarks = len(self.landmarks)
        F_full = np.eye(3 + 2*n_landmarks)
        F_full[:3, :3] = F

        # Update covariance
        Q_full = np.eye(3 + 2*n_landmarks)
        Q_full[:3, :3] = self.Q

        self.covariance = F_full @ self.covariance @ F_full.T + Q_full

    def update(self, measurements):
        """Update with new measurements"""
        # Data association
        associations = self.associate_observations(measurements)

        for meas, landmark_id in associations:
            if landmark_id == 'new':
                # Initialize new landmark
                self.initialize_landmark(meas)
            else:
                # Update with known landmark
                self.update_landmark(meas, landmark_id)

    def associate_observations(self, measurements):
        """Associate measurements with landmarks using gating"""
        associations = []
        used_landmarks = set()

        for meas in measurements:
            best_match = None
            best_nis = float('inf')

            # Check against all landmarks
            for i, landmark in enumerate(self.landmarks):
                if i in used_landmarks:
                    continue

                # Get landmark state index
                landmark_state_idx = 3 + 2*i

                # Predict measurement
                z_pred = self.predict_landmark_measurement(landmark_state_idx)

                # Innovation covariance
                H = self.measurement_jacobian(landmark_state_idx)
                S = H @ self.covariance[landmark_state_idx:landmark_state_idx+2,
                                          landmark_state_idx:landmark_state_idx+2] @ H.T + self.R

                # Normalized innovation squared
                innovation = meas - z_pred
                nis = innovation.T @ np.linalg.inv(S) @ innovation

                # Gating
                if nis < self.gating_threshold and nis < best_nis:
                    best_nis = nis
                    best_match = i

            if best_match is not None:
                associations.append((meas, best_match))
                used_landmarks.add(best_match)
            else:
                associations.append((meas, 'new'))

        return associations

    def predict_landmark_measurement(self, landmark_idx):
        """Predict measurement for landmark"""
        # Landmark position
        lx = self.state[landmark_idx]
        ly = self.state[landmark_idx + 1]

        # Robot pose
        x, y, theta = self.state[:3]

        # Relative position
        dx = lx - x
        dy = ly - y

        # Range and bearing
        range_pred = np.sqrt(dx**2 + dy**2)
        bearing_pred = np.arctan2(dy, dx) - theta

        return np.array([range_pred, bearing_pred])

    def measurement_jacobian(self, landmark_idx):
        """Jacobian of measurement function"""
        # Landmark position
        lx = self.state[landmark_idx]
        ly = self.state[landmark_idx + 1]

        # Robot pose
        x, y, theta = self.state[:3]

        # Relative position
        dx = lx - x
        dy = ly - y
        dist = np.sqrt(dx**2 + dy**2)

        # Jacobian w.r.t robot pose
        H_robot = np.zeros((2, 3))
        H_robot[0, 0] = -dx / dist  # ∂range/∂x
        H_robot[0, 1] = -dy / dist  # ∂range/∂y
        H_robot[1, 0] = -dy / (dist**2)  # ∂bearing/∂x
        H_robot[1, 1] =  dx / (dist**2)   # ∂bearing/∂y
        H_robot[1, 2] = -1              # ∂bearing/∂θ

        # Jacobian w.r.t landmark position
        H_landmark = np.zeros((2, 2))
        H_landmark[0, 0] = dx / dist   # ∂range/∂lx
        H_landmark[0, 1] = dy / dist   # ∂range/∂ly
        H_landmark[1, 0] = -dy / (dist**2)  # ∂bearing/∂lx
        H_landmark[1, 1] = dx / (dist**2)   # ∂bearing/∂ly

        # Full Jacobian
        state_size = len(self.state)
        H_full = np.zeros((2, state_size))
        H_full[:, :3] = H_robot
        H_full[:, landmark_idx:landmark_idx+2] = H_landmark

        return H_full

    def update_landmark(self, measurement, landmark_id):
        """Update landmark and robot pose using EKF"""
        landmark_idx = 3 + 2 * landmark_id

        # Predict measurement
        z_pred = self.predict_landmark_measurement(landmark_idx)

        # Jacobian
        H = self.measurement_jacobian(landmark_idx)

        # Innovation and covariance
        innovation = measurement - z_pred
        S = H @ self.covariance @ H.T + self.R

        # Kalman gain
        K = self.covariance @ H.T @ np.linalg.inv(S)

        # State update
        self.state += K @ innovation

        # Covariance update
        I_KH = np.eye(len(self.state)) - K @ H
        self.covariance = I_KH @ self.covariance @ I_KH.T + K @ self.R @ K.T

    def initialize_landmark(self, measurement):
        """Initialize new landmark from measurement"""
        # Robot pose
        x, y, theta = self.state[:3]

        # Landmark position in world frame
        range_obs, bearing_obs = measurement

        lx = x + range_obs * np.cos(bearing_obs + theta)
        ly = y + range_obs * np.sin(bearing_obs + theta)

        # Add to state
        self.state = np.append(self.state, [lx, ly])

        # Augment covariance
        n = len(self.state) - 2
        new_covariance = np.eye(n + 2)
        new_covariance[:n, :n] = self.covariance

        # Initial landmark uncertainty
        new_covariance[n:n+2, n:n+2] = self.initial_covariance

        # Cross-covariances
        G = self.landmark_initialization_jacobian(range_obs, bearing_obs, theta)
        new_covariance[:3, n:n+2] = G.T
        new_covariance[n:n+2, :3] = G

        self.covariance = new_covariance
        self.landmarks.append({
            'id': self.next_landmark_id,
            'position': np.array([lx, ly]),
            'observations': 1
        })

        self.next_landmark_id += 1

15.3 Visual SLAM (VSLAM)

15.3.1 Feature-Based VSLAM

Visual SLAM uses visual features from camera images:

class VisualSLAM:
    def __init__(self, camera_parameters):
        self.camera = PinholeCamera(camera_parameters)
        self.feature_detector = ORBDetector()
        self.feature_matcher = BFMatcher()

        # Keyframes and map points
        self.keyframes = []
        self.map_points = []
        self.current_pose = np.eye(4)

        # Tracking state
        self.tracking_state = 'NOT_INITIALIZED'
        self.min_matches_for_tracking = 15
        self.min_matches_for_keyframe = 50

        # Loop closure detection
        self.loop_closure_detector = LoopClosureDetector()
        self.loop_closure_threshold = 0.75

    def process_frame(self, image, timestamp):
        """Process new camera frame"""
        # Extract features
        keypoints, descriptors = self.feature_detector.detect_and_compute(image)

        if self.tracking_state == 'NOT_INITIALIZED':
            # Initialize map with first frame
            self.initialize_map(image, keypoints, descriptors, timestamp)
            self.tracking_state = 'OK'
            return self.current_pose

        # Track current frame
        tracked = self.track_frame(keypoints, descriptors, image, timestamp)

        if not tracked:
            # Tracking lost - try to relocalize
            self.relocalize(image, keypoints, descriptors)
            self.tracking_state = 'LOST'

        # Check for loop closure
        if self.tracking_state == 'OK':
            loop_closure = self.detect_loop_closure(image, keypoints, descriptors)
            if loop_closure:
                self.perform_loop_closure(loop_closure)

        return self.current_pose, self.map_points

    def initialize_map(self, image, keypoints, descriptors, timestamp):
        """Initialize map with first frame"""
        # Create initial keyframe
        initial_keyframe = KeyFrame(
            image=image.copy(),
            keypoints=keypoints,
            descriptors=descriptors,
            pose=np.eye(4),
            timestamp=timestamp
        )
        self.keyframes.append(initial_keyframe)

        # Initialize map points from keypoints
        for i, (kp, desc) in enumerate(zip(keypoints, descriptors)):
            map_point = MapPoint(
                position=self._triangulate_initial_point(kp, np.eye(4)),
                descriptor=desc,
                observations=[{
                    'keyframe_id': 0,
                    'keypoint_idx': i,
                    'pixel_coord': kp.pt,
                    'depth': None
                }]
            )
            self.map_points.append(map_point)

    def track_frame(self, keypoints, descriptors, image, timestamp):
        """Track current frame against map"""
        # Match features with last keyframe
        last_keyframe = self.keyframes[-1]
        matches = self.feature_matcher.match(last_keyframe.descriptors, descriptors)

        if len(matches) < self.min_matches_for_tracking:
            return False

        # Separate tracked and new features
        tracked_points = []
        new_keypoints = []
        new_descriptors = []

        tracked_map_points = []
        for match in matches:
            map_point_idx = match.queryIdx
            if map_point_idx < len(self.map_points):
                tracked_points.append({
                    'map_point_idx': map_point_idx,
                    'keypoint_idx': match.trainIdx,
                    'pixel_coord': keypoints[match.trainIdx].pt
                })
                tracked_map_points.append(map_point_idx)

        # Find unmatched keypoints
        matched_indices = {m.trainIdx for m in matches}
        for i, (kp, desc) in enumerate(zip(keypoints, descriptors)):
            if i not in matched_indices:
                new_keypoints.append(kp)
                new_descriptors.append(desc)

        # Estimate motion (PnP)
        if len(tracked_points) >= 4:
            pose_estimated = self.estimate_motion_pnp(tracked_points, image.shape)
            if pose_estimated is not None:
                self.current_pose = pose_estimated

                # Triangulate new points
                new_map_points = self.triangulate_new_points(
                    new_keypoints, new_descriptors, image.shape
                )
                self.map_points.extend(new_map_points)

                # Check for keyframe creation
                if self.should_create_keyframe(image, len(tracked_points)):
                    self.create_keyframe(image, keypoints, descriptors, timestamp)

                return True

        return False

    def estimate_motion_pnp(self, correspondences, image_shape):
        """Estimate camera pose using PnP"""
        if len(correspondences) < 4:
            return None

        # Prepare data for PnP
        object_points = []
        image_points = []

        for corr in correspondences:
            map_point = self.map_points[corr['map_point_idx']]
            object_points.append(map_point.position)
            image_points.append(corr['pixel_coord'])

        object_points = np.array(object_points, dtype=np.float32)
        image_points = np.array(image_points, dtype=np.float32)

        # Solve PnP
        success, rvec, tvec, inliers = cv2.solvePnPRansac(
            object_points,
            image_points,
            self.camera.intrinsic_matrix,
            None,
            iterationsCount=1000,
            reprojectionError=2.0,
            confidence=0.99
        )

        if success:
            # Convert rotation vector to matrix
            R, _ = cv2.Rodrigues(rvec)

            # Create pose matrix
            pose = np.eye(4)
            pose[:3, :3] = R
            pose[:3, 3] = tvec.flatten()

            return pose

        return None

    def detect_loop_closure(self, image, keypoints, descriptors):
        """Detect loop closure using visual place recognition"""
        # Extract current bag-of-words descriptor
        current_bow = self._extract_bag_of_words(descriptors)

        # Compare with previous keyframes
        best_match = None
        best_similarity = 0

        for i, keyframe in enumerate(self.keyframes[:-10]):  # Don't check recent frames
            similarity = self._compare_bow_descriptors(current_bow, keyframe.bow_descriptor)

            if similarity > best_similarity and similarity > self.loop_closure_threshold:
                best_similarity = similarity
                best_match = i

        if best_match is not None:
            return {
                'current_frame_idx': len(self.keyframes),
                'loop_frame_idx': best_match,
                'similarity': best_similarity,
                'current_keypoints': keypoints,
                'current_descriptors': descriptors
            }

        return None

    def perform_loop_closure(self, loop_closure):
        """Perform loop closure correction"""
        # Match features between current and loop frame
        loop_frame = self.keyframes[loop_closure['loop_frame_idx']]
        current_keypoints = loop_closure['current_keypoints']
        current_descriptors = loop_closure['current_descriptors']

        matches = self.feature_matcher.match(loop_frame.descriptors, current_descriptors)

        if len(matches) < 20:  # Minimum matches for loop closure
            return

        # Estimate relative pose
        relative_pose = self._estimate_relative_pose(loop_frame, current_keypoints, matches)

        if relative_pose is not None:
            # Correct poses using pose graph optimization
            self.correct_poses_with_loop_closure(
                loop_closure['loop_frame_idx'],
                len(self.keyframes),
                relative_pose
            )

    def _extract_bag_of_words(self, descriptors):
        """Extract bag-of-words descriptor for place recognition"""
        # In practice, use VLAD or Fisher Vector
        # Here, use simple histogram of visual words
        visual_words = self._quantize_to_visual_words(descriptors)
        histogram, _ = np.histogram(visual_words, bins=1000)
        return histogram / np.linalg.norm(histogram)

    def _quantize_to_visual_words(self, descriptors):
        """Quantize descriptors to visual words"""
        # In practice, use pre-trained visual vocabulary
        # Here, use simple quantization
        visual_words = []
        for desc in descriptors:
            word = int(np.sum(desc) * 100) % 1000
            visual_words.append(word)
        return visual_words

15.3.2 Dense SLAM with RGB-D

Dense SLAM using depth cameras provides complete 3D reconstruction:

class DenseSLAM:
    def __init__(self, camera_parameters):
        self.color_camera = PinholeCamera(camera_parameters['color'])
        self.depth_camera = PinholeCamera(camera_parameters['depth'])

        # Keyframe management
        self.keyframes = []
        self.max_keyframes = 100

        # Map representation
        self.volume = TSDFVolume(resolution=0.01, size=10.0)
        self.raycasting = RayCasting(self.depth_camera)

        # Pose tracking
        self.current_pose = np.eye(4)
        self.pose_graph = PoseGraph()

        # Dense tracking
        self.feature_tracker = DenseFeatureTracker()
        self.direct_method = DirectMethod()

    def process_rgbd_frame(self, color_image, depth_image, timestamp):
        """Process RGB-D frame"""
        # Track frame using direct method
        pose_delta, tracking_quality = self.direct_method.track_frame(
            color_image, depth_image, self.current_pose
        )

        if tracking_quality < 0.3:
            # Lost tracking - try global registration
            pose_estimate = self.global_registration(color_image, depth_image)
            if pose_estimate is not None:
                self.current_pose = pose_estimate
                tracking_quality = 0.8

        else:
            # Update pose
            self.current_pose = pose_delta @ self.current_pose

        # Update volume if keyframe
        if self.should_create_keyframe(tracking_quality, timestamp):
            self.create_dense_keyframe(color_image, depth_image, timestamp)

        # Raycast for depth prediction
        predicted_depth = self.raycasting.render_depth(
            color_image.shape[:2], self.current_pose
        )

        return self.current_pose, predicted_depth

    def create_dense_keyframe(self, color_image, depth_image, timestamp):
        """Create dense keyframe and update volume"""
        keyframe = DenseKeyFrame(
            color_image=color_image.copy(),
            depth_image=depth_image.copy(),
            pose=self.current_pose.copy(),
            timestamp=timestamp,
            pyramid=self._create_image_pyramid(color_image)
        )

        # Add to keyframe list
        self.keyframes.append(keyframe)

        # Update TSDF volume
        self.volume.integrate_frame(
            color_image, depth_image,
            self.current_pose,
            self.color_camera.intrinsic_matrix
        )

        # Add to pose graph
        if len(self.keyframes) > 1:
            last_keyframe = self.keyframes[-2]
            relative_pose = np.linalg.inv(last_keyframe.pose) @ self.current_pose
            self.pose_graph.add_edge(
                len(self.keyframes) - 2,
                len(self.keyframes) - 1,
                relative_pose
            )

        # Limit number of keyframes
        if len(self.keyframes) > self.max_keyframes:
            self.remove_oldest_keyframe()

    def global_registration(self, color_image, depth_image):
        """Global registration when tracking is lost"""
        best_match = None
        best_inliers = 0

        for i, keyframe in enumerate(self.keyframes):
            # Perform feature matching
            matches = self.feature_tracker.match_frames(
                keyframe.color_image, color_image
            )

            if len(matches) > 30:
                # Estimate pose using PnP with depth
                pose, inliers = self._estimate_pose_with_depth(
                    matches, keyframe, depth_image
                )

                if inliers > best_inliers:
                    best_inliers = inliers
                    best_match = (i, pose)

        if best_match:
            keyframe_idx, pose = best_match

            # Add loop closure edge to pose graph
            self.pose_graph.add_edge(
                keyframe_idx,
                len(self.keyframes),
                np.linalg.inv(self.keyframes[keyframe_idx].pose) @ pose,
                loop_closure=True
            )

            # Optimize pose graph
            optimized_poses = self.pose_graph.optimize()

            if len(optimized_poses) > len(self.keyframes):
                self.current_pose = optimized_poses[-1]

            return pose

        return None

    def extract_mesh(self):
        """Extract triangle mesh from TSDF volume"""
        vertices, faces = self.volume.extract_mesh()

        return {
            'vertices': vertices,
            'faces': faces,
            'colors': self._extract_vertex_colors(vertices)
        }

class TSDFVolume:
    def __init__(self, resolution, size):
        self.resolution = resolution
        self.size = size
        self.voxels_per_dim = int(size / resolution)

        # TSDF volume
        self.tsdf = np.ones((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim))
        self.weight = np.zeros((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim))

        # Color volume
        self.color = np.zeros((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim, 3))

        # Truncation distance
        self.truncation = 4 * resolution

    def integrate_frame(self, color_image, depth_image, pose, intrinsic_matrix):
        """Integrate new RGB-D frame into TSDF volume"""
        height, width = depth_image.shape

        # Create coordinate grids
        u, v = np.meshgrid(np.arange(width), np.arange(height))

        # Backproject depth to 3D points
        fx, fy = intrinsic_matrix[0, 0], intrinsic_matrix[1, 1]
        cx, cy = intrinsic_matrix[0, 2], intrinsic_matrix[1, 2]

        # Convert depth to meters if needed
        depth_meters = depth_image / 1000.0

        # Backproject to camera coordinates
        x_cam = (u - cx) * depth_meters / fx
        y_cam = (v - cy) * depth_meters / fy
        z_cam = depth_meters

        # Transform to world coordinates
        points_cam = np.stack([x_cam.flatten(), y_cam.flatten(), z_cam.flatten()], axis=1)
        ones = np.ones((points_cam.shape[0], 1))
        points_cam_homo = np.hstack([points_cam, ones])

        points_world = (pose @ points_cam_homo.T).T
        points_world = points_world[:, :3]

        # Filter valid points
        valid = (depth_meters.flatten() > 0.1) & (depth_meters.flatten() < 10.0)
        points_world = points_world[valid]
        colors = color_image.reshape(-1, 3)[valid]

        # Update TSDF for each point
        for i, (point, color) in enumerate(zip(points_world, colors)):
            voxel = self._world_to_voxel(point)
            if self._is_valid_voxel(voxel):
                # Calculate signed distance
                distance = np.linalg.norm(point - (pose @ np.array([0, 0, 0, 1]).T)[:3])

                # Get current TSDF value
                current_tsdf = self.tsdf[voxel[0], voxel[1], voxel[2]]
                current_weight = self.weight[voxel[0], voxel[1], voxel[2]]

                # Running average update
                new_tsdf = min(distance, self.truncation)
                new_weight = 1.0

                if current_weight + new_weight > 0:
                    updated_tsdf = (current_tsdf * current_weight + new_tsdf * new_weight) / (current_weight + new_weight)
                    self.tsdf[voxel[0], voxel[1], voxel[2]] = updated_tsdf
                    self.weight[voxel[0], voxel[1], voxel[2]] = current_weight + new_weight

                    # Update color
                    self.color[voxel[0], voxel[1], voxel[2]] = color

    def extract_mesh(self):
        """Extract mesh using marching cubes"""
        from skimage.measure import marching_cubes

        # Find zero-crossing surface
        vertices, faces, normals, values = marching_cubes(
            self.tsdf, level=0
        )

        # Scale voxel coordinates to world coordinates
        scale = self.size / self.voxels_per_dim
        vertices = vertices * scale - self.size / 2

        # Filter small triangles
        face_areas = self._calculate_face_areas(vertices, faces)
        valid_faces = face_areas > (scale ** 2) * 0.01  # Minimum triangle area

        vertices = vertices[valid_faces]
        faces = faces[valid_faces]

        return vertices, faces

15.4 Navigation with SLAM

15.4.1 Path Planning in SLAM Maps

Plan paths through SLAM-constructed environments:

class SLAMNavigation:
    def __init__(self, slam_system):
        self.slam = slam_system
        self.occupancy_grid = OccupancyGrid(resolution=0.05, size=100.0)

        # Path planning
        self.path_planner = AStarPlanner(self.occupancy_grid)
        self.trajectory_planner = TrajectoryPlanner()

        # Navigation state
        self.current_goal = None
        self.current_path = None
        self.navigation_state = 'IDLE'

    def update_navigation_map(self):
        """Update occupancy grid from SLAM map"""
        # Convert SLAM landmarks to occupancy grid
        self.occupancy_grid.clear()

        # Add landmarks as obstacles
        for landmark in self.slam.landmarks:
            x, y = landmark['position']
            self.occupancy_grid.set_occupancy(x, y, 1.0)

        # Add trajectory as free space
        for pose in self.slam.trajectory:
            x, y = pose[:2]
            self.occupancy_grid.set_occupancy(x, y, 0.0)

        # Inflate obstacles for robot footprint
        self.occupancy_grid.inflate_obstacles(robot_radius=0.3)

    def navigate_to_goal(self, goal_position):
        """Navigate to specified goal position"""
        self.current_goal = goal_position
        self.navigation_state = 'PLANNING'

        # Update navigation map
        self.update_navigation_map()

        # Get current position
        current_position = self.slam.robot_pose[:2]

        # Plan path
        path = self.path_planner.plan_path(current_position, goal_position)

        if path is not None:
            self.current_path = path
            self.navigation_state = 'EXECUTING'
            return True
        else:
            self.navigation_state = 'NO_PATH'
            return False

    def execute_path(self):
        """Execute planned path"""
        if self.navigation_state != 'EXECUTING' or self.current_path is None:
            return None

        # Get current position
        current_position = self.slam.robot_pose[:2]

        # Find target point on path
        target_point = self._get_target_point(current_position)

        if target_point is None:
            self.navigation_state = 'REACHED_GOAL'
            return None

        # Generate trajectory to target point
        trajectory = self.trajectory_planner.generate_trajectory(
            current_position,
            self.slam.robot_pose[2],  # Current heading
            target_point
        )

        # Calculate control command
        control = self._trajectory_to_control(trajectory)

        return control

    def _get_target_point(self, current_position):
        """Get target point along path"""
        if self.current_path is None or len(self.current_path) == 0:
            return None

        # Find closest point on path
        min_dist = float('inf')
        closest_idx = 0

        for i, point in enumerate(self.current_path):
            dist = np.linalg.norm(current_position - point)
            if dist < min_dist:
                min_dist = dist
                closest_idx = i

        # Look ahead on path
        look_ahead_distance = 2.0
        target_point = None

        for j in range(closest_idx, min(closest_idx + 20, len(self.current_path))):
            point = self.current_path[j]
            dist = np.linalg.norm(current_position - point)

            if dist >= look_ahead_distance:
                target_point = point
                break

        # If no point found ahead, use last point
        if target_point is None:
            target_point = self.current_path[-1]

        return target_point

class AStarPlanner:
    def __init__(self, occupancy_grid):
        self.grid = occupancy_grid
        self.heuristic_weight = 1.0

    def plan_path(self, start, goal):
        """Plan path using A* algorithm"""
        start_cell = self.grid.world_to_grid(start)
        goal_cell = self.grid.world_to_grid(goal)

        if not self.grid.is_valid_cell(start_cell) or not self.grid.is_valid_cell(goal_cell):
            return None

        # A* algorithm
        open_set = PriorityQueue()
        closed_set = set()

        # Start node
        start_node = Node(start_cell, None, 0, self._heuristic(start_cell, goal_cell))
        open_set.put(start_node)

        while not open_set.empty():
            current = open_set.get()

            if current.cell == goal_cell:
                # Reconstruct path
                path = []
                node = current
                while node is not None:
                    path.append(self.grid.grid_to_world(node.cell))
                    node = node.parent
                return path[::-1]

            closed_set.add(current.cell)

            # Explore neighbors
            for neighbor in self._get_neighbors(current.cell):
                if not self.grid.is_valid_cell(neighbor) or neighbor in closed_set:
                    continue

                g_cost = current.g_cost + self._distance(current.cell, neighbor)
                h_cost = self._heuristic(neighbor, goal_cell)
                f_cost = g_cost + self.heuristic_weight * h_cost

                neighbor_node = Node(neighbor, current, g_cost, h_cost)
                open_set.put(neighbor_node)

        return None  # No path found

    def _heuristic(self, cell1, cell2):
        """A* heuristic (Euclidean distance)"""
        return self._distance(cell1, cell2)

    def _distance(self, cell1, cell2):
        """Distance between grid cells"""
        return np.linalg.norm(np.array(cell1) - np.array(cell2))

    def _get_neighbors(self, cell):
        """Get valid neighbor cells"""
        neighbors = []
        directions = [(0, 1), (1, 0), (0, -1), (-1, 0),  # 4-connected
                       (1, 1), (1, -1), (-1, 1), (-1, -1)]  # 8-connected

        for dx, dy in directions:
            neighbor = (cell[0] + dx, cell[1] + dy)
            if self.grid.is_valid_cell(neighbor):
                neighbors.append(neighbor)

        return neighbors

class Node:
    def __init__(self, cell, parent, g_cost, h_cost):
        self.cell = cell
        self.parent = parent
        self.g_cost = g_cost  # Cost from start
        self.h_cost = h_cost  # Heuristic cost to goal
        self.f_cost = g_cost + h_cost  # Total cost

    def __lt__(self, other):
        return self.f_cost < other.f_cost

class TrajectoryPlanner:
    def __init__(self):
        self.max_velocity = 1.0
        self.max_angular_velocity = 1.0
        self.dt = 0.1

    def generate_trajectory(self, current_pos, current_heading, target_pos):
        """Generate smooth trajectory to target"""
        # Calculate relative position and angle
        dx = target_pos[0] - current_pos[0]
        dy = target_pos[1] - current_pos[1]
        target_heading = np.arctan2(dy, dx)

        # Calculate heading error
        heading_error = self._normalize_angle(target_heading - current_heading)

        # Simple P-controller for velocities
        kp_linear = 1.0
        kp_angular = 2.0

        # Distance to target
        distance = np.sqrt(dx**2 + dy**2)

        # Control velocities
        linear_velocity = kp_linear * distance
        angular_velocity = kp_angular * heading_error

        # Saturate velocities
        linear_velocity = np.clip(linear_velocity, -self.max_velocity, self.max_velocity)
        angular_velocity = np.clip(angular_velocity, -self.max_angular_velocity, self.max_angular_velocity)

        # Stop if very close
        if distance < 0.1:
            linear_velocity = 0
            angular_velocity = 0

        return {
            'linear_velocity': linear_velocity,
            'angular_velocity': angular_velocity,
            'trajectory': self._predict_trajectory(
                current_pos, current_heading, linear_velocity, angular_velocity, self.dt
            )
        }

    def _normalize_angle(self, angle):
        """Normalize angle to [-pi, pi]"""
        while angle > np.pi:
            angle -= 2 * np.pi
        while angle < -np.pi:
            angle += 2 * np.pi
        return angle

    def _predict_trajectory(self, pos, heading, v, omega, dt, horizon=1.0):
        """Predict future trajectory"""
        trajectory = [pos.copy()]
        current_pos = pos.copy()
        current_heading = heading

        steps = int(horizon / dt)
        for _ in range(steps):
            # Update position
            current_pos[0] += v * np.cos(current_heading) * dt
            current_pos[1] += v * np.sin(current_heading) * dt
            current_heading += omega * dt
            current_heading = self._normalize_angle(current_heading)

            trajectory.append(current_pos.copy())

        return trajectory

Chapter Summary

This chapter covered comprehensive SLAM and navigation systems for robotics:

Key Concepts Covered

  1. SLAM Fundamentals: Mathematical framework and uncertainty representation
  2. EKF-SLAM: Extended Kalman Filter for landmark-based SLAM
  3. Visual SLAM: Feature-based and dense RGB-D SLAM systems
  4. Loop Closure: Place recognition and pose graph optimization
  5. Navigation: Path planning and execution in SLAM maps
  6. Real-time Applications: Practical implementations for autonomous robots

Practical Implementations

  • Complete EKF-SLAM with data association and landmark management
  • Feature-based VSLAM with ORB features and loop closure detection
  • Dense SLAM with TSDF volume integration and mesh extraction
  • A* path planning with SLAM map integration
  • Trajectory planning with smooth control laws

Next Steps

With SLAM and navigation expertise, you're ready for:

  • Chapter 16: Path Planning Algorithms
  • Part V: Embodied Intelligence & VLA (Chapters 17-20)

Glossary Terms

Term: Data Association Definition: Process of matching sensor measurements to map landmarks, a critical challenge in SLAM Related: Nearest Neighbor, Joint Compatibility Test

Term: Loop Closure Definition: Detection when robot returns to previously visited location, enabling map correction and global consistency Related: Place Recognition, Pose Graph Optimization

Term: Pose Graph Definition: Graph representation of robot poses and constraints between them, optimized for global consistency Related: Bundle Adjustment, Graph SLAM

Term: TSDF (Truncated Signed Distance Function) Definition: Implicit surface representation storing distance to nearest surface with truncation for bandlimited updates Related: Volumetric Mapping, Marching Cubes

Term: Visual Odometry Definition: Estimation of camera motion by tracking visual features between consecutive frames Related: Feature Tracking, Essential Matrix


Exercises

Exercise 15.1: EKF-SLAM Implementation

Implement complete EKF-SLAM system:

  • Add landmark initialization and management
  • Implement robust data association with gating
  • Handle map management and pruning
  • Visualize uncertainty ellipses

Exercise 15.2: Feature-Based VSLAM

Build visual SLAM with loop closure:

  • Implement ORB feature extraction and matching
  • Create keyframe management system
  • Add loop closure detection with place recognition
  • Optimize pose graph for global consistency

Exercise 15.3: Dense RGB-D SLAM

Develop dense SLAM with TSDF volume:

  • Implement direct method tracking
  • Create TSDF volume integration
  • Extract and visualize mesh
  • Handle missing depth data

Exercise 15.4: SLAM Navigation

Create navigation system using SLAM map:

  • Build occupancy grid from SLAM landmarks
  • Implement A* path planning algorithm
  • Add trajectory planning with smooth control
  • Handle dynamic obstacles

Exercise 15.5: Multi-Robot SLAM

Design collaborative SLAM system:

  • Implement multi-robot pose graph optimization
  • Create inter-robot loop closure detection
  • Design distributed map merging
  • Evaluate system scalability