Chapter 15: SLAM, VSLAM, and Navigation
15.1 SLAM Fundamentals
15.1.1 The SLAM Problem
Simultaneous Localization and Mapping (SLAM) is one of the fundamental challenges in robotics. The robot must build a map of an unknown environment while simultaneously estimating its position within that map.
SLAM addresses the chicken-and-egg problem: to build a map, you need to know your position; to know your position, you need a map. The solution lies in the probabilistic framework that estimates both pose and map simultaneously while maintaining uncertainty estimates.
15.1.2 SLAM Mathematical Framework
The SLAM problem can be formulated as a Bayesian estimation problem:
Where:
x_{1:t}= Robot poses over timem= Map (landmarks or occupancy grid)z_{1:t}= Measurements over timeu_{1:t}= Controls over time
class SLAMFramework:
def __init__(self, initial_pose, map_parameters):
self.robot_pose = initial_pose
self.map = Map(map_parameters)
self.landmarks = []
self.trajectory = [initial_pose.copy()]
self.uncertainty = np.eye(3) * 0.1 # Initial pose uncertainty
def predict(self, control, dt):
"""Predict robot pose using motion model"""
# Update pose using control input
self.robot_pose = self.motion_model(self.robot_pose, control, dt)
# Propagate uncertainty
self.uncertainty = self.propagate_uncertainty(control, dt)
return self.robot_pose.copy(), self.uncertainty.copy()
def update(self, measurements):
"""Update pose and map with new measurements"""
# Data association - match measurements to landmarks
associations = self.data_association(measurements)
for measurement, landmark_id in associations:
if landmark_id == 'new':
# Initialize new landmark
self.initialize_landmark(measurement)
else:
# Update existing landmark and robot pose
self.update_with_known_landmark(measurement, landmark_id)
# Store trajectory
self.trajectory.append(self.robot_pose.copy())
def motion_model(self, pose, control, dt):
"""Differential drive motion model"""
x, y, theta = pose
v, omega = control
# Add control noise
v_noisy = v + np.random.normal(0, 0.1)
omega_noisy = omega + np.random.normal(0, 0.1)
# Update pose
if abs(omega_noisy) < 1e-6:
# Straight line motion
x_new = x + v_noisy * np.cos(theta) * dt
y_new = y + v_noisy * np.sin(theta) * dt
theta_new = theta
else:
# Circular motion
R = v_noisy / omega_noisy
x_new = x + R * (np.sin(theta + omega_noisy * dt) - np.sin(theta))
y_new = y + R * (-np.cos(theta + omega_noisy * dt) + np.cos(theta))
theta_new = theta + omega_noisy * dt
return np.array([x_new, y_new, theta_new])
def data_association(self, measurements):
"""Associate measurements with known landmarks"""
associations = []
for measurement in measurements:
best_landmark = None
best_distance = float('inf')
# Find nearest landmark
for i, landmark in enumerate(self.landmarks):
predicted_measurement = self.predict_measurement(landmark)
distance = np.linalg.norm(measurement - predicted_measurement)
if distance < best_distance and distance < 2.0: # Association threshold
best_distance = distance
best_landmark = i
if best_landmark is not None:
associations.append((measurement, best_landmark))
else:
associations.append((measurement, 'new'))
return associations
def predict_measurement(self, landmark):
"""Predict measurement from landmark position"""
dx = landmark['position'][0] - self.robot_pose[0]
dy = landmark['position'][1] - self.robot_pose[1]
distance = np.sqrt(dx**2 + dy**2)
# Bearing to landmark
bearing = np.arctan2(dy, dx) - self.robot_pose[2]
return np.array([distance, bearing])
15.2 Extended Kalman Filter SLAM
15.2.1 EKF-SLAM Implementation
Extended Kalman Filter SLAM maintains joint state of robot pose and landmark positions:
class EKFSLAM:
def __init__(self, initial_pose, initial_covariance):
# State: [x, y, theta, l1_x, l1_y, l2_x, l2_y, ...]
self.state = np.zeros(3)
self.state[:3] = initial_pose
# Covariance matrix
self.covariance = np.eye(3) * 0.1
self.initial_covariance = initial_covariance
# Process and measurement noise
self.Q = np.diag([0.1, 0.1, 0.01]) # Process noise
self.R = np.diag([0.5, 0.1]) # Measurement noise (range, bearing)
# Landmarks
self.landmarks = []
self.next_landmark_id = 0
# Data association parameters
self.gating_threshold = 3.0 # Mahalanobis distance threshold
def predict(self, control, dt):
"""Predict robot pose and covariance"""
# Extract robot pose
pose = self.state[:3]
# Predict new pose
new_pose = self.motion_model(pose, control, dt)
self.state[:3] = new_pose
# Jacobian of motion model
F = self.motion_jacobian(pose, control, dt)
# Augment Jacobian for landmarks (identity)
n_landmarks = len(self.landmarks)
F_full = np.eye(3 + 2*n_landmarks)
F_full[:3, :3] = F
# Update covariance
Q_full = np.eye(3 + 2*n_landmarks)
Q_full[:3, :3] = self.Q
self.covariance = F_full @ self.covariance @ F_full.T + Q_full
def update(self, measurements):
"""Update with new measurements"""
# Data association
associations = self.associate_observations(measurements)
for meas, landmark_id in associations:
if landmark_id == 'new':
# Initialize new landmark
self.initialize_landmark(meas)
else:
# Update with known landmark
self.update_landmark(meas, landmark_id)
def associate_observations(self, measurements):
"""Associate measurements with landmarks using gating"""
associations = []
used_landmarks = set()
for meas in measurements:
best_match = None
best_nis = float('inf')
# Check against all landmarks
for i, landmark in enumerate(self.landmarks):
if i in used_landmarks:
continue
# Get landmark state index
landmark_state_idx = 3 + 2*i
# Predict measurement
z_pred = self.predict_landmark_measurement(landmark_state_idx)
# Innovation covariance
H = self.measurement_jacobian(landmark_state_idx)
S = H @ self.covariance[landmark_state_idx:landmark_state_idx+2,
landmark_state_idx:landmark_state_idx+2] @ H.T + self.R
# Normalized innovation squared
innovation = meas - z_pred
nis = innovation.T @ np.linalg.inv(S) @ innovation
# Gating
if nis < self.gating_threshold and nis < best_nis:
best_nis = nis
best_match = i
if best_match is not None:
associations.append((meas, best_match))
used_landmarks.add(best_match)
else:
associations.append((meas, 'new'))
return associations
def predict_landmark_measurement(self, landmark_idx):
"""Predict measurement for landmark"""
# Landmark position
lx = self.state[landmark_idx]
ly = self.state[landmark_idx + 1]
# Robot pose
x, y, theta = self.state[:3]
# Relative position
dx = lx - x
dy = ly - y
# Range and bearing
range_pred = np.sqrt(dx**2 + dy**2)
bearing_pred = np.arctan2(dy, dx) - theta
return np.array([range_pred, bearing_pred])
def measurement_jacobian(self, landmark_idx):
"""Jacobian of measurement function"""
# Landmark position
lx = self.state[landmark_idx]
ly = self.state[landmark_idx + 1]
# Robot pose
x, y, theta = self.state[:3]
# Relative position
dx = lx - x
dy = ly - y
dist = np.sqrt(dx**2 + dy**2)
# Jacobian w.r.t robot pose
H_robot = np.zeros((2, 3))
H_robot[0, 0] = -dx / dist # ∂range/∂x
H_robot[0, 1] = -dy / dist # ∂range/∂y
H_robot[1, 0] = -dy / (dist**2) # ∂bearing/∂x
H_robot[1, 1] = dx / (dist**2) # ∂bearing/∂y
H_robot[1, 2] = -1 # ∂bearing/∂θ
# Jacobian w.r.t landmark position
H_landmark = np.zeros((2, 2))
H_landmark[0, 0] = dx / dist # ∂range/∂lx
H_landmark[0, 1] = dy / dist # ∂range/∂ly
H_landmark[1, 0] = -dy / (dist**2) # ∂bearing/∂lx
H_landmark[1, 1] = dx / (dist**2) # ∂bearing/∂ly
# Full Jacobian
state_size = len(self.state)
H_full = np.zeros((2, state_size))
H_full[:, :3] = H_robot
H_full[:, landmark_idx:landmark_idx+2] = H_landmark
return H_full
def update_landmark(self, measurement, landmark_id):
"""Update landmark and robot pose using EKF"""
landmark_idx = 3 + 2 * landmark_id
# Predict measurement
z_pred = self.predict_landmark_measurement(landmark_idx)
# Jacobian
H = self.measurement_jacobian(landmark_idx)
# Innovation and covariance
innovation = measurement - z_pred
S = H @ self.covariance @ H.T + self.R
# Kalman gain
K = self.covariance @ H.T @ np.linalg.inv(S)
# State update
self.state += K @ innovation
# Covariance update
I_KH = np.eye(len(self.state)) - K @ H
self.covariance = I_KH @ self.covariance @ I_KH.T + K @ self.R @ K.T
def initialize_landmark(self, measurement):
"""Initialize new landmark from measurement"""
# Robot pose
x, y, theta = self.state[:3]
# Landmark position in world frame
range_obs, bearing_obs = measurement
lx = x + range_obs * np.cos(bearing_obs + theta)
ly = y + range_obs * np.sin(bearing_obs + theta)
# Add to state
self.state = np.append(self.state, [lx, ly])
# Augment covariance
n = len(self.state) - 2
new_covariance = np.eye(n + 2)
new_covariance[:n, :n] = self.covariance
# Initial landmark uncertainty
new_covariance[n:n+2, n:n+2] = self.initial_covariance
# Cross-covariances
G = self.landmark_initialization_jacobian(range_obs, bearing_obs, theta)
new_covariance[:3, n:n+2] = G.T
new_covariance[n:n+2, :3] = G
self.covariance = new_covariance
self.landmarks.append({
'id': self.next_landmark_id,
'position': np.array([lx, ly]),
'observations': 1
})
self.next_landmark_id += 1
15.3 Visual SLAM (VSLAM)
15.3.1 Feature-Based VSLAM
Visual SLAM uses visual features from camera images:
class VisualSLAM:
def __init__(self, camera_parameters):
self.camera = PinholeCamera(camera_parameters)
self.feature_detector = ORBDetector()
self.feature_matcher = BFMatcher()
# Keyframes and map points
self.keyframes = []
self.map_points = []
self.current_pose = np.eye(4)
# Tracking state
self.tracking_state = 'NOT_INITIALIZED'
self.min_matches_for_tracking = 15
self.min_matches_for_keyframe = 50
# Loop closure detection
self.loop_closure_detector = LoopClosureDetector()
self.loop_closure_threshold = 0.75
def process_frame(self, image, timestamp):
"""Process new camera frame"""
# Extract features
keypoints, descriptors = self.feature_detector.detect_and_compute(image)
if self.tracking_state == 'NOT_INITIALIZED':
# Initialize map with first frame
self.initialize_map(image, keypoints, descriptors, timestamp)
self.tracking_state = 'OK'
return self.current_pose
# Track current frame
tracked = self.track_frame(keypoints, descriptors, image, timestamp)
if not tracked:
# Tracking lost - try to relocalize
self.relocalize(image, keypoints, descriptors)
self.tracking_state = 'LOST'
# Check for loop closure
if self.tracking_state == 'OK':
loop_closure = self.detect_loop_closure(image, keypoints, descriptors)
if loop_closure:
self.perform_loop_closure(loop_closure)
return self.current_pose, self.map_points
def initialize_map(self, image, keypoints, descriptors, timestamp):
"""Initialize map with first frame"""
# Create initial keyframe
initial_keyframe = KeyFrame(
image=image.copy(),
keypoints=keypoints,
descriptors=descriptors,
pose=np.eye(4),
timestamp=timestamp
)
self.keyframes.append(initial_keyframe)
# Initialize map points from keypoints
for i, (kp, desc) in enumerate(zip(keypoints, descriptors)):
map_point = MapPoint(
position=self._triangulate_initial_point(kp, np.eye(4)),
descriptor=desc,
observations=[{
'keyframe_id': 0,
'keypoint_idx': i,
'pixel_coord': kp.pt,
'depth': None
}]
)
self.map_points.append(map_point)
def track_frame(self, keypoints, descriptors, image, timestamp):
"""Track current frame against map"""
# Match features with last keyframe
last_keyframe = self.keyframes[-1]
matches = self.feature_matcher.match(last_keyframe.descriptors, descriptors)
if len(matches) < self.min_matches_for_tracking:
return False
# Separate tracked and new features
tracked_points = []
new_keypoints = []
new_descriptors = []
tracked_map_points = []
for match in matches:
map_point_idx = match.queryIdx
if map_point_idx < len(self.map_points):
tracked_points.append({
'map_point_idx': map_point_idx,
'keypoint_idx': match.trainIdx,
'pixel_coord': keypoints[match.trainIdx].pt
})
tracked_map_points.append(map_point_idx)
# Find unmatched keypoints
matched_indices = {m.trainIdx for m in matches}
for i, (kp, desc) in enumerate(zip(keypoints, descriptors)):
if i not in matched_indices:
new_keypoints.append(kp)
new_descriptors.append(desc)
# Estimate motion (PnP)
if len(tracked_points) >= 4:
pose_estimated = self.estimate_motion_pnp(tracked_points, image.shape)
if pose_estimated is not None:
self.current_pose = pose_estimated
# Triangulate new points
new_map_points = self.triangulate_new_points(
new_keypoints, new_descriptors, image.shape
)
self.map_points.extend(new_map_points)
# Check for keyframe creation
if self.should_create_keyframe(image, len(tracked_points)):
self.create_keyframe(image, keypoints, descriptors, timestamp)
return True
return False
def estimate_motion_pnp(self, correspondences, image_shape):
"""Estimate camera pose using PnP"""
if len(correspondences) < 4:
return None
# Prepare data for PnP
object_points = []
image_points = []
for corr in correspondences:
map_point = self.map_points[corr['map_point_idx']]
object_points.append(map_point.position)
image_points.append(corr['pixel_coord'])
object_points = np.array(object_points, dtype=np.float32)
image_points = np.array(image_points, dtype=np.float32)
# Solve PnP
success, rvec, tvec, inliers = cv2.solvePnPRansac(
object_points,
image_points,
self.camera.intrinsic_matrix,
None,
iterationsCount=1000,
reprojectionError=2.0,
confidence=0.99
)
if success:
# Convert rotation vector to matrix
R, _ = cv2.Rodrigues(rvec)
# Create pose matrix
pose = np.eye(4)
pose[:3, :3] = R
pose[:3, 3] = tvec.flatten()
return pose
return None
def detect_loop_closure(self, image, keypoints, descriptors):
"""Detect loop closure using visual place recognition"""
# Extract current bag-of-words descriptor
current_bow = self._extract_bag_of_words(descriptors)
# Compare with previous keyframes
best_match = None
best_similarity = 0
for i, keyframe in enumerate(self.keyframes[:-10]): # Don't check recent frames
similarity = self._compare_bow_descriptors(current_bow, keyframe.bow_descriptor)
if similarity > best_similarity and similarity > self.loop_closure_threshold:
best_similarity = similarity
best_match = i
if best_match is not None:
return {
'current_frame_idx': len(self.keyframes),
'loop_frame_idx': best_match,
'similarity': best_similarity,
'current_keypoints': keypoints,
'current_descriptors': descriptors
}
return None
def perform_loop_closure(self, loop_closure):
"""Perform loop closure correction"""
# Match features between current and loop frame
loop_frame = self.keyframes[loop_closure['loop_frame_idx']]
current_keypoints = loop_closure['current_keypoints']
current_descriptors = loop_closure['current_descriptors']
matches = self.feature_matcher.match(loop_frame.descriptors, current_descriptors)
if len(matches) < 20: # Minimum matches for loop closure
return
# Estimate relative pose
relative_pose = self._estimate_relative_pose(loop_frame, current_keypoints, matches)
if relative_pose is not None:
# Correct poses using pose graph optimization
self.correct_poses_with_loop_closure(
loop_closure['loop_frame_idx'],
len(self.keyframes),
relative_pose
)
def _extract_bag_of_words(self, descriptors):
"""Extract bag-of-words descriptor for place recognition"""
# In practice, use VLAD or Fisher Vector
# Here, use simple histogram of visual words
visual_words = self._quantize_to_visual_words(descriptors)
histogram, _ = np.histogram(visual_words, bins=1000)
return histogram / np.linalg.norm(histogram)
def _quantize_to_visual_words(self, descriptors):
"""Quantize descriptors to visual words"""
# In practice, use pre-trained visual vocabulary
# Here, use simple quantization
visual_words = []
for desc in descriptors:
word = int(np.sum(desc) * 100) % 1000
visual_words.append(word)
return visual_words
15.3.2 Dense SLAM with RGB-D
Dense SLAM using depth cameras provides complete 3D reconstruction:
class DenseSLAM:
def __init__(self, camera_parameters):
self.color_camera = PinholeCamera(camera_parameters['color'])
self.depth_camera = PinholeCamera(camera_parameters['depth'])
# Keyframe management
self.keyframes = []
self.max_keyframes = 100
# Map representation
self.volume = TSDFVolume(resolution=0.01, size=10.0)
self.raycasting = RayCasting(self.depth_camera)
# Pose tracking
self.current_pose = np.eye(4)
self.pose_graph = PoseGraph()
# Dense tracking
self.feature_tracker = DenseFeatureTracker()
self.direct_method = DirectMethod()
def process_rgbd_frame(self, color_image, depth_image, timestamp):
"""Process RGB-D frame"""
# Track frame using direct method
pose_delta, tracking_quality = self.direct_method.track_frame(
color_image, depth_image, self.current_pose
)
if tracking_quality < 0.3:
# Lost tracking - try global registration
pose_estimate = self.global_registration(color_image, depth_image)
if pose_estimate is not None:
self.current_pose = pose_estimate
tracking_quality = 0.8
else:
# Update pose
self.current_pose = pose_delta @ self.current_pose
# Update volume if keyframe
if self.should_create_keyframe(tracking_quality, timestamp):
self.create_dense_keyframe(color_image, depth_image, timestamp)
# Raycast for depth prediction
predicted_depth = self.raycasting.render_depth(
color_image.shape[:2], self.current_pose
)
return self.current_pose, predicted_depth
def create_dense_keyframe(self, color_image, depth_image, timestamp):
"""Create dense keyframe and update volume"""
keyframe = DenseKeyFrame(
color_image=color_image.copy(),
depth_image=depth_image.copy(),
pose=self.current_pose.copy(),
timestamp=timestamp,
pyramid=self._create_image_pyramid(color_image)
)
# Add to keyframe list
self.keyframes.append(keyframe)
# Update TSDF volume
self.volume.integrate_frame(
color_image, depth_image,
self.current_pose,
self.color_camera.intrinsic_matrix
)
# Add to pose graph
if len(self.keyframes) > 1:
last_keyframe = self.keyframes[-2]
relative_pose = np.linalg.inv(last_keyframe.pose) @ self.current_pose
self.pose_graph.add_edge(
len(self.keyframes) - 2,
len(self.keyframes) - 1,
relative_pose
)
# Limit number of keyframes
if len(self.keyframes) > self.max_keyframes:
self.remove_oldest_keyframe()
def global_registration(self, color_image, depth_image):
"""Global registration when tracking is lost"""
best_match = None
best_inliers = 0
for i, keyframe in enumerate(self.keyframes):
# Perform feature matching
matches = self.feature_tracker.match_frames(
keyframe.color_image, color_image
)
if len(matches) > 30:
# Estimate pose using PnP with depth
pose, inliers = self._estimate_pose_with_depth(
matches, keyframe, depth_image
)
if inliers > best_inliers:
best_inliers = inliers
best_match = (i, pose)
if best_match:
keyframe_idx, pose = best_match
# Add loop closure edge to pose graph
self.pose_graph.add_edge(
keyframe_idx,
len(self.keyframes),
np.linalg.inv(self.keyframes[keyframe_idx].pose) @ pose,
loop_closure=True
)
# Optimize pose graph
optimized_poses = self.pose_graph.optimize()
if len(optimized_poses) > len(self.keyframes):
self.current_pose = optimized_poses[-1]
return pose
return None
def extract_mesh(self):
"""Extract triangle mesh from TSDF volume"""
vertices, faces = self.volume.extract_mesh()
return {
'vertices': vertices,
'faces': faces,
'colors': self._extract_vertex_colors(vertices)
}
class TSDFVolume:
def __init__(self, resolution, size):
self.resolution = resolution
self.size = size
self.voxels_per_dim = int(size / resolution)
# TSDF volume
self.tsdf = np.ones((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim))
self.weight = np.zeros((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim))
# Color volume
self.color = np.zeros((self.voxels_per_dim, self.voxels_per_dim, self.voxels_per_dim, 3))
# Truncation distance
self.truncation = 4 * resolution
def integrate_frame(self, color_image, depth_image, pose, intrinsic_matrix):
"""Integrate new RGB-D frame into TSDF volume"""
height, width = depth_image.shape
# Create coordinate grids
u, v = np.meshgrid(np.arange(width), np.arange(height))
# Backproject depth to 3D points
fx, fy = intrinsic_matrix[0, 0], intrinsic_matrix[1, 1]
cx, cy = intrinsic_matrix[0, 2], intrinsic_matrix[1, 2]
# Convert depth to meters if needed
depth_meters = depth_image / 1000.0
# Backproject to camera coordinates
x_cam = (u - cx) * depth_meters / fx
y_cam = (v - cy) * depth_meters / fy
z_cam = depth_meters
# Transform to world coordinates
points_cam = np.stack([x_cam.flatten(), y_cam.flatten(), z_cam.flatten()], axis=1)
ones = np.ones((points_cam.shape[0], 1))
points_cam_homo = np.hstack([points_cam, ones])
points_world = (pose @ points_cam_homo.T).T
points_world = points_world[:, :3]
# Filter valid points
valid = (depth_meters.flatten() > 0.1) & (depth_meters.flatten() < 10.0)
points_world = points_world[valid]
colors = color_image.reshape(-1, 3)[valid]
# Update TSDF for each point
for i, (point, color) in enumerate(zip(points_world, colors)):
voxel = self._world_to_voxel(point)
if self._is_valid_voxel(voxel):
# Calculate signed distance
distance = np.linalg.norm(point - (pose @ np.array([0, 0, 0, 1]).T)[:3])
# Get current TSDF value
current_tsdf = self.tsdf[voxel[0], voxel[1], voxel[2]]
current_weight = self.weight[voxel[0], voxel[1], voxel[2]]
# Running average update
new_tsdf = min(distance, self.truncation)
new_weight = 1.0
if current_weight + new_weight > 0:
updated_tsdf = (current_tsdf * current_weight + new_tsdf * new_weight) / (current_weight + new_weight)
self.tsdf[voxel[0], voxel[1], voxel[2]] = updated_tsdf
self.weight[voxel[0], voxel[1], voxel[2]] = current_weight + new_weight
# Update color
self.color[voxel[0], voxel[1], voxel[2]] = color
def extract_mesh(self):
"""Extract mesh using marching cubes"""
from skimage.measure import marching_cubes
# Find zero-crossing surface
vertices, faces, normals, values = marching_cubes(
self.tsdf, level=0
)
# Scale voxel coordinates to world coordinates
scale = self.size / self.voxels_per_dim
vertices = vertices * scale - self.size / 2
# Filter small triangles
face_areas = self._calculate_face_areas(vertices, faces)
valid_faces = face_areas > (scale ** 2) * 0.01 # Minimum triangle area
vertices = vertices[valid_faces]
faces = faces[valid_faces]
return vertices, faces
15.4 Navigation with SLAM
15.4.1 Path Planning in SLAM Maps
Plan paths through SLAM-constructed environments:
class SLAMNavigation:
def __init__(self, slam_system):
self.slam = slam_system
self.occupancy_grid = OccupancyGrid(resolution=0.05, size=100.0)
# Path planning
self.path_planner = AStarPlanner(self.occupancy_grid)
self.trajectory_planner = TrajectoryPlanner()
# Navigation state
self.current_goal = None
self.current_path = None
self.navigation_state = 'IDLE'
def update_navigation_map(self):
"""Update occupancy grid from SLAM map"""
# Convert SLAM landmarks to occupancy grid
self.occupancy_grid.clear()
# Add landmarks as obstacles
for landmark in self.slam.landmarks:
x, y = landmark['position']
self.occupancy_grid.set_occupancy(x, y, 1.0)
# Add trajectory as free space
for pose in self.slam.trajectory:
x, y = pose[:2]
self.occupancy_grid.set_occupancy(x, y, 0.0)
# Inflate obstacles for robot footprint
self.occupancy_grid.inflate_obstacles(robot_radius=0.3)
def navigate_to_goal(self, goal_position):
"""Navigate to specified goal position"""
self.current_goal = goal_position
self.navigation_state = 'PLANNING'
# Update navigation map
self.update_navigation_map()
# Get current position
current_position = self.slam.robot_pose[:2]
# Plan path
path = self.path_planner.plan_path(current_position, goal_position)
if path is not None:
self.current_path = path
self.navigation_state = 'EXECUTING'
return True
else:
self.navigation_state = 'NO_PATH'
return False
def execute_path(self):
"""Execute planned path"""
if self.navigation_state != 'EXECUTING' or self.current_path is None:
return None
# Get current position
current_position = self.slam.robot_pose[:2]
# Find target point on path
target_point = self._get_target_point(current_position)
if target_point is None:
self.navigation_state = 'REACHED_GOAL'
return None
# Generate trajectory to target point
trajectory = self.trajectory_planner.generate_trajectory(
current_position,
self.slam.robot_pose[2], # Current heading
target_point
)
# Calculate control command
control = self._trajectory_to_control(trajectory)
return control
def _get_target_point(self, current_position):
"""Get target point along path"""
if self.current_path is None or len(self.current_path) == 0:
return None
# Find closest point on path
min_dist = float('inf')
closest_idx = 0
for i, point in enumerate(self.current_path):
dist = np.linalg.norm(current_position - point)
if dist < min_dist:
min_dist = dist
closest_idx = i
# Look ahead on path
look_ahead_distance = 2.0
target_point = None
for j in range(closest_idx, min(closest_idx + 20, len(self.current_path))):
point = self.current_path[j]
dist = np.linalg.norm(current_position - point)
if dist >= look_ahead_distance:
target_point = point
break
# If no point found ahead, use last point
if target_point is None:
target_point = self.current_path[-1]
return target_point
class AStarPlanner:
def __init__(self, occupancy_grid):
self.grid = occupancy_grid
self.heuristic_weight = 1.0
def plan_path(self, start, goal):
"""Plan path using A* algorithm"""
start_cell = self.grid.world_to_grid(start)
goal_cell = self.grid.world_to_grid(goal)
if not self.grid.is_valid_cell(start_cell) or not self.grid.is_valid_cell(goal_cell):
return None
# A* algorithm
open_set = PriorityQueue()
closed_set = set()
# Start node
start_node = Node(start_cell, None, 0, self._heuristic(start_cell, goal_cell))
open_set.put(start_node)
while not open_set.empty():
current = open_set.get()
if current.cell == goal_cell:
# Reconstruct path
path = []
node = current
while node is not None:
path.append(self.grid.grid_to_world(node.cell))
node = node.parent
return path[::-1]
closed_set.add(current.cell)
# Explore neighbors
for neighbor in self._get_neighbors(current.cell):
if not self.grid.is_valid_cell(neighbor) or neighbor in closed_set:
continue
g_cost = current.g_cost + self._distance(current.cell, neighbor)
h_cost = self._heuristic(neighbor, goal_cell)
f_cost = g_cost + self.heuristic_weight * h_cost
neighbor_node = Node(neighbor, current, g_cost, h_cost)
open_set.put(neighbor_node)
return None # No path found
def _heuristic(self, cell1, cell2):
"""A* heuristic (Euclidean distance)"""
return self._distance(cell1, cell2)
def _distance(self, cell1, cell2):
"""Distance between grid cells"""
return np.linalg.norm(np.array(cell1) - np.array(cell2))
def _get_neighbors(self, cell):
"""Get valid neighbor cells"""
neighbors = []
directions = [(0, 1), (1, 0), (0, -1), (-1, 0), # 4-connected
(1, 1), (1, -1), (-1, 1), (-1, -1)] # 8-connected
for dx, dy in directions:
neighbor = (cell[0] + dx, cell[1] + dy)
if self.grid.is_valid_cell(neighbor):
neighbors.append(neighbor)
return neighbors
class Node:
def __init__(self, cell, parent, g_cost, h_cost):
self.cell = cell
self.parent = parent
self.g_cost = g_cost # Cost from start
self.h_cost = h_cost # Heuristic cost to goal
self.f_cost = g_cost + h_cost # Total cost
def __lt__(self, other):
return self.f_cost < other.f_cost
class TrajectoryPlanner:
def __init__(self):
self.max_velocity = 1.0
self.max_angular_velocity = 1.0
self.dt = 0.1
def generate_trajectory(self, current_pos, current_heading, target_pos):
"""Generate smooth trajectory to target"""
# Calculate relative position and angle
dx = target_pos[0] - current_pos[0]
dy = target_pos[1] - current_pos[1]
target_heading = np.arctan2(dy, dx)
# Calculate heading error
heading_error = self._normalize_angle(target_heading - current_heading)
# Simple P-controller for velocities
kp_linear = 1.0
kp_angular = 2.0
# Distance to target
distance = np.sqrt(dx**2 + dy**2)
# Control velocities
linear_velocity = kp_linear * distance
angular_velocity = kp_angular * heading_error
# Saturate velocities
linear_velocity = np.clip(linear_velocity, -self.max_velocity, self.max_velocity)
angular_velocity = np.clip(angular_velocity, -self.max_angular_velocity, self.max_angular_velocity)
# Stop if very close
if distance < 0.1:
linear_velocity = 0
angular_velocity = 0
return {
'linear_velocity': linear_velocity,
'angular_velocity': angular_velocity,
'trajectory': self._predict_trajectory(
current_pos, current_heading, linear_velocity, angular_velocity, self.dt
)
}
def _normalize_angle(self, angle):
"""Normalize angle to [-pi, pi]"""
while angle > np.pi:
angle -= 2 * np.pi
while angle < -np.pi:
angle += 2 * np.pi
return angle
def _predict_trajectory(self, pos, heading, v, omega, dt, horizon=1.0):
"""Predict future trajectory"""
trajectory = [pos.copy()]
current_pos = pos.copy()
current_heading = heading
steps = int(horizon / dt)
for _ in range(steps):
# Update position
current_pos[0] += v * np.cos(current_heading) * dt
current_pos[1] += v * np.sin(current_heading) * dt
current_heading += omega * dt
current_heading = self._normalize_angle(current_heading)
trajectory.append(current_pos.copy())
return trajectory
Chapter Summary
This chapter covered comprehensive SLAM and navigation systems for robotics:
Key Concepts Covered
- SLAM Fundamentals: Mathematical framework and uncertainty representation
- EKF-SLAM: Extended Kalman Filter for landmark-based SLAM
- Visual SLAM: Feature-based and dense RGB-D SLAM systems
- Loop Closure: Place recognition and pose graph optimization
- Navigation: Path planning and execution in SLAM maps
- Real-time Applications: Practical implementations for autonomous robots
Practical Implementations
- Complete EKF-SLAM with data association and landmark management
- Feature-based VSLAM with ORB features and loop closure detection
- Dense SLAM with TSDF volume integration and mesh extraction
- A* path planning with SLAM map integration
- Trajectory planning with smooth control laws
Next Steps
With SLAM and navigation expertise, you're ready for:
- Chapter 16: Path Planning Algorithms
- Part V: Embodied Intelligence & VLA (Chapters 17-20)
Glossary Terms
Term: Data Association Definition: Process of matching sensor measurements to map landmarks, a critical challenge in SLAM Related: Nearest Neighbor, Joint Compatibility Test
Term: Loop Closure Definition: Detection when robot returns to previously visited location, enabling map correction and global consistency Related: Place Recognition, Pose Graph Optimization
Term: Pose Graph Definition: Graph representation of robot poses and constraints between them, optimized for global consistency Related: Bundle Adjustment, Graph SLAM
Term: TSDF (Truncated Signed Distance Function) Definition: Implicit surface representation storing distance to nearest surface with truncation for bandlimited updates Related: Volumetric Mapping, Marching Cubes
Term: Visual Odometry Definition: Estimation of camera motion by tracking visual features between consecutive frames Related: Feature Tracking, Essential Matrix
Exercises
Exercise 15.1: EKF-SLAM Implementation
Implement complete EKF-SLAM system:
- Add landmark initialization and management
- Implement robust data association with gating
- Handle map management and pruning
- Visualize uncertainty ellipses
Exercise 15.2: Feature-Based VSLAM
Build visual SLAM with loop closure:
- Implement ORB feature extraction and matching
- Create keyframe management system
- Add loop closure detection with place recognition
- Optimize pose graph for global consistency
Exercise 15.3: Dense RGB-D SLAM
Develop dense SLAM with TSDF volume:
- Implement direct method tracking
- Create TSDF volume integration
- Extract and visualize mesh
- Handle missing depth data
Exercise 15.4: SLAM Navigation
Create navigation system using SLAM map:
- Build occupancy grid from SLAM landmarks
- Implement A* path planning algorithm
- Add trajectory planning with smooth control
- Handle dynamic obstacles
Exercise 15.5: Multi-Robot SLAM
Design collaborative SLAM system:
- Implement multi-robot pose graph optimization
- Create inter-robot loop closure detection
- Design distributed map merging
- Evaluate system scalability