Chapter 5: Nodes, Topics, Services, and Actions
Introduction
While Chapter 4 introduced the fundamental concepts of ROS 2, this chapter delves deeper into the advanced communication patterns that enable complex robotic systems. We'll explore sophisticated node architectures, efficient topic design patterns, robust service implementations, and complex action workflows. Understanding these patterns is essential for building scalable, maintainable, and performant robotic applications.
The choice between topics, services, and actions fundamentally impacts your system's performance, reliability, and maintainability. Mastering these communication patterns is key to becoming a proficient ROS 2 developer.
5.1 Advanced Node Architectures
5.1.1 Node Composition and Lifecycle
Modern ROS 2 applications often use node composition to reduce computational overhead and improve communication efficiency:
Diagram: Node Composition vs Multiple Nodes
Traditional Multi-Node Architecture:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Sensor │ │ Processor │ │Controller │
│ Node │ │ Node │ │ Node │
│ │ │ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │
│ │Publisher│──┼─→│Subscriber│──┼─→│Subscriber│ │
│ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │
└─────────────┘ └─────────────┘ └─────────────┘
↕ ↕ ↕
Network transport with serialization overhead
Composed Node Architecture:
┌─────────────────────────────────────────────────────────┐
│ Composed Node │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Sensor │ │ Processor │ │Controller │ │
│ │Component │ │Component │ │Component │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ Shared Memory Buffer │ │
│ │ Zero-Copy Communication │ │
│ └─────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
Example: Composed Node Implementation
#include <std_msgs/msg/string.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <geometry_msgs/msg/twist.hpp>
class ComposedRobotNode : public rclcpp::Node {
public:
ComposedRobotNode() : Node("composed_robot_node") {
// Create components
setup_sensor_component();
setup_processor_component();
setup_controller_component();
// Create shared data structures
shared_data_ = std::make_shared<SharedData>();
RCLCPP_INFO(this->get_logger(), "Composed robot node initialized");
}
private:
struct SharedData {
sensor_msgs::msg::Image::SharedPtr latest_image;
geometry_msgs::msg::Twist::SharedPtr control_command;
bool processing_active = false;
std::mutex mutex;
};
std::shared_ptr<SharedData> shared_data_;
void setup_sensor_component() {
// Image subscriber
image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
"/camera/image_raw", 10,
[this](const sensor_msgs::msg::Image::SharedPtr msg) {
std::lock_guard<std::mutex> lock(shared_data_->mutex);
shared_data_->latest_image = msg;
});
// Timer for simulated sensor processing
sensor_timer_ = this->create_wall_timer(
std::chrono::milliseconds(30),
[this]() { process_sensor_data(); });
}
void setup_processor_component() {
// Timer for image processing
processor_timer_ = this->create_wall_timer(
std::chrono::milliseconds(100),
[this]() { process_image_data(); });
}
void setup_controller_component() {
// Publisher for control commands
cmd_vel_pub_ = this->create_publisher<geometry_msgs::msg::Twist>(
"/cmd_vel", 10);
// Timer for control loop
control_timer_ = this->create_wall_timer(
std::chrono::milliseconds(50),
[this]() { execute_control_loop(); });
}
void process_sensor_data() {
// Simulate sensor processing
// In real implementation, this would process raw sensor data
}
void process_image_data() {
std::lock_guard<std::mutex> lock(shared_data_->mutex);
if (shared_data_->latest_image && !shared_data_->processing_active) {
shared_data_->processing_active = true;
// Process image (simulation)
RCLCPP_INFO(this->get_logger(),
"Processing image %dx%d",
shared_data_->latest_image->width,
shared_data_->latest_image->height);
// Generate control command based on image processing
auto cmd = std::make_shared<geometry_msgs::msg::Twist>();
cmd->linear.x = 0.5; // Example: move forward
cmd->angular.z = 0.1; // Example: slight turn
shared_data_->control_command = cmd;
shared_data_->processing_active = false;
}
}
void execute_control_loop() {
std::lock_guard<std::mutex> lock(shared_data_->mutex);
if (shared_data_->control_command) {
cmd_vel_pub_->publish(*shared_data_->control_command);
shared_data_->control_command.reset();
}
}
// Component members
rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr cmd_vel_pub_;
rclcpp::TimerBase::SharedPtr sensor_timer_;
rclcpp::TimerBase::SharedPtr processor_timer_;
rclcpp::TimerBase::SharedPtr control_timer_;
};
int main(int argc, char** argv) {
rclcpp::init(argc, argv);
auto node = std::make_shared<ComposedRobotNode>();
rclcpp::spin(node);
rclcpp::shutdown();
return 0;
}
5.1.2 Multi-threaded Node Design
ROS 2 nodes can leverage multiple threads for concurrent processing:
Example: Multi-threaded Node Example
from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
import threading
import time
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist
class MultiThreadedRobotNode(Node):
def __init__(self):
super().__init__('multithreaded_robot_node')
# Create executor for this node
self.executor = MultiThreadedExecutor()
# Shared state
self.image_data = None
self.processing_lock = threading.Lock()
self.control_command = None
# Setup components in separate threads
self.setup_sensor_thread()
self.setup_processor_thread()
self.setup_control_thread()
self.get_logger().info('Multi-threaded robot node initialized')
def setup_sensor_thread(self):
"""Setup sensor processing in separate thread"""
def sensor_worker():
# Create subscription
image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10)
# Add this node to executor
self.executor.add_node(self)
# Spin in thread
self.executor.spin()
# Start thread
self.sensor_thread = threading.Thread(target=sensor_worker, daemon=True)
self.sensor_thread.start()
def setup_processor_thread(self):
"""Setup image processing in separate thread"""
def processor_worker():
while rclpy.ok():
time.sleep(0.1) # 10 Hz processing
with self.processing_lock:
if self.image_data is not None:
self.process_image()
self.processor_thread = threading.Thread(target=processor_worker, daemon=True)
self.processor_thread.start()
def setup_control_thread(self):
"""Setup control loop in separate thread"""
def control_worker():
# Create publisher
cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
while rclpy.ok():
time.sleep(0.05) # 20 Hz control loop
with self.processing_lock:
if self.control_command is not None:
cmd_vel_pub.publish(self.control_command)
self.control_command = None
self.control_thread = threading.Thread(target=control_worker, daemon=True)
self.control_thread.start()
def image_callback(self, msg):
"""Handle incoming image messages"""
with self.processing_lock:
self.image_data = msg
def process_image(self):
"""Process latest image data"""
if self.image_data is None:
return
# Simulate image processing
self.get_logger().info(
f'Processing image: {self.image_data.width}x{self.image_data.height}')
# Generate control command
cmd = Twist()
cmd.linear.x = 0.5
cmd.angular.z = 0.1
self.control_command = cmd
self.image_data = None
def main():
rclpy.init()
node = MultiThreadedRobotNode()
try:
# Keep main thread alive
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
5.1.3 Node Lifecycle Management
ROS 2 provides a lifecycle management system for controlled startup and shutdown:
Example: Lifecycle Node Implementation
#include <lifecycle_msgs/msg/state.hpp>
#include <lifecycle_msgs/msg/transition.hpp>
class LifecycleRobotNode : public rclcpp_lifecycle::LifecycleNode {
public:
explicit LifecycleRobotNode(const std::string& node_name)
: LifecycleNode(node_name) {}
using CallbackReturn =
rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn;
// Lifecycle state transitions
CallbackReturn on_configure(const rclcpp_lifecycle::State&) {
// Configure hardware and resources
configure_hardware();
setup_subscribers();
setup_publishers();
RCLCPP_INFO(get_logger(), "Node configured successfully");
return CallbackReturn::SUCCESS;
}
CallbackReturn on_activate(const rclcpp_lifecycle::State&) {
// Activate publishers and start processing
activate_publishers();
start_processing();
RCLCPP_INFO(get_logger(), "Node activated successfully");
return CallbackReturn::SUCCESS;
}
CallbackReturn on_deactivate(const rclcpp_lifecycle::State&) {
// Deactivate publishers and stop processing
stop_processing();
deactivate_publishers();
RCLCPP_INFO(get_logger(), "Node deactivated successfully");
return CallbackReturn::SUCCESS;
}
CallbackReturn on_cleanup(const rclcpp_lifecycle::State&) {
// Cleanup resources
cleanup_hardware();
RCLCPP_INFO(get_logger(), "Node cleaned up successfully");
return CallbackReturn::SUCCESS;
}
CallbackReturn on_shutdown(const rclcpp_lifecycle::State&) {
// Final shutdown operations
RCLCPP_INFO(get_logger(), "Node shutting down successfully");
return CallbackReturn::SUCCESS;
}
private:
void configure_hardware() {
// Initialize hardware components
RCLCPP_INFO(get_logger(), "Configuring hardware...");
}
void setup_subscribers() {
// Setup subscribers
RCLCPP_INFO(get_logger(), "Setting up subscribers...");
}
void setup_publishers() {
// Setup publishers (but don't activate yet)
cmd_vel_pub_ = create_publisher<geometry_msgs::msg::Twist>(
"/cmd_vel", rclcpp::SystemDefaultsQoS());
RCLCPP_INFO(get_logger(), "Setting up publishers...");
}
void activate_publishers() {
// Activate publishers
if (cmd_vel_pub_) {
cmd_vel_pub_->on_activate();
}
RCLCPP_INFO(get_logger(), "Activating publishers...");
}
void deactivate_publishers() {
// Deactivate publishers
if (cmd_vel_pub_) {
cmd_vel_pub_->on_deactivate();
}
RCLCPP_INFO(get_logger(), "Deactivating publishers...");
}
void start_processing() {
// Start main processing loop
processing_active_ = true;
RCLCPP_INFO(get_logger(), "Starting processing...");
}
void stop_processing() {
// Stop main processing loop
processing_active_ = false;
RCLCPP_INFO(get_logger(), "Stopping processing...");
}
void cleanup_hardware() {
// Cleanup hardware resources
RCLCPP_INFO(get_logger(), "Cleaning up hardware...");
}
rclcpp_lifecycle::LifecyclePublisher<geometry_msgs::msg::Twist>::SharedPtr cmd_vel_pub_;
bool processing_active_ = false;
};
5.2 Advanced Topic Patterns
5.2.1 Bandwidth Optimization
For high-frequency data streams, optimization is crucial:
Example: Optimized Topic Publisher
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from std_msgs.msg import Header
class OptimizedImagePublisher(Node):
def __init__(self):
super().__init__('optimized_image_publisher')
# Publisher with optimized QoS
self.image_pub = self.create_publisher(
Image, '/camera/image_compressed', 10)
# Timer for high-frequency publishing
self.timer = self.create_timer(1.0/30.0, self.publish_image) # 30 Hz
# Image data
self.image_width = 640
self.image_height = 480
self.frame_id = 0
# Compression settings
self.compression_quality = 80
self.publish_compressed = True
self.get_logger().info('Optimized image publisher started')
def generate_test_image(self):
"""Generate test image data"""
# Create numpy array representing image
image_data = np.random.randint(
0, 256,
(self.image_height, self.image_width, 3),
dtype=np.uint8
)
return image_data
def compress_image(self, image_data):
"""Compress image to reduce bandwidth"""
# In real implementation, use image compression library
# like OpenCV: cv2.imencode('.jpg', image_data, params)
return image_data.tobytes() # Simplified
def publish_image(self):
"""Publish optimized image"""
# Generate test image
image_data = self.generate_test_image()
# Create message
msg = Image()
msg.header = Header()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = f"camera_frame_{self.frame_id}"
msg.height = self.image_height
msg.width = self.image_width
msg.encoding = "rgb8"
msg.is_bigendian = False
msg.step = self.image_width * 3 # bytes per row
# Optimize based on settings
if self.publish_compressed:
# Compressed image (simplified)
compressed_data = self.compress_image(image_data)
msg.data = compressed_data
else:
# Raw image
msg.data = image_data.tobytes()
# Publish
self.image_pub.publish(msg)
self.frame_id += 1
# Log bandwidth usage
data_size = len(msg.data)
self.get_logger().debug(
f'Published frame {self.frame_id}, size: {data_size} bytes',
throttle_duration_sec=1.0
)
# Advanced optimization with selective publishing
class SmartImagePublisher(OptimizedImagePublisher):
def __init__(self):
super().__init__()
# Previous image for difference calculation
self.previous_image = None
self.threshold = 0.1 # 10% change threshold
self.frame_skip = 2 # Publish every 2nd frame
self.frame_counter = 0
def calculate_difference(self, image1, image2):
"""Calculate percentage difference between images"""
if image1 is None or image2 is None:
return 1.0
# Calculate absolute difference
diff = np.abs(image1.astype(float) - image2.astype(float))
return np.mean(diff) / 255.0
def publish_image(self):
"""Publish image only if significant change"""
self.frame_counter += 1
# Skip frames based on frame_skip setting
if self.frame_counter % self.frame_skip != 0:
return
# Generate current image
current_image = self.generate_test_image()
# Calculate difference with previous image
diff = self.calculate_difference(self.previous_image, current_image)
# Only publish if significant change
if diff > self.threshold or self.previous_image is None:
super().publish_image()
self.previous_image = current_image.copy()
self.get_logger().debug(
f'Published frame due to {diff:.1%} change')
else:
self.get_logger().debug(
f'Skipped frame, only {diff:.1%} change')
5.2.2 Message Type Design
Effective message type design is crucial for performance and maintainability:
Diagram: Message Type Hierarchy
Base Message
├── Header (timestamp, frame_id)
├── Core Data Fields
├── Metadata Fields
└── Optional Extensions
Good Design:
robot_control_msg/Command
├── header (std_msgs/Header)
├── command_id (uint32)
├── command_type (uint8)
├── target_pose (geometry_msgs/PoseStamped)
├── velocity (geometry_msgs/Twist)
├── gripper_command (GripperCommand)
└── parameters (CustomParameters)
Avoid: Large monolithic messages with many optional fields
Example: Custom Message Definition
std_msgs/Header header
uint32 command_id
uint8 type=0
uint8 STOP=0
uint8 MOVE=1
uint8 GRASP=2
uint8 RELEASE=3
# Required fields based on command type
geometry_msgs/PoseStamped target_pose
geometry_msgs/Twist velocity
GripperCommand gripper_command
# Additional parameters
CustomParameters[] parameters
# robot_control_msg/GripperCommand.msg
float64 position # 0.0 (open) to 1.0 (closed)
float64 force # Maximum closing force
bool relative # True for relative positioning
# robot_control_msg/CustomParameters.msg
string key
string value
5.2.3 Topic Naming Conventions
Follow ROS 2 topic naming conventions: use descriptive names, include hierarchy, and follow lower_case_with_underscores naming pattern.
Example: Topic Naming Examples
/camera/image_raw
/camera/image_rect_color
/camera/camera_info
/robot_base/joint_states
/robot_base/odom
/arm_controller/command
/arm_controller/state
/gripper_controller/command
/mobile_base/cmd_vel
/mobile_base/odom
/mobile_base/joint_states
# Multi-robot systems
/robot_1/camera/image_raw
/robot_1/mobile_base/cmd_vel
/robot_2/camera/image_raw
/robot_2/mobile_base/cmd_vel
# Sensor specific naming
/front_lidar/scan
/rear_lidar/scan
/left_camera/image_raw
/right_camera/image_raw
/imu/data
/gps/fix
5.3 Service Design Patterns
5.3.1 Service Chain Patterns
Services can be chained together for complex operations:
Example: Service Chain Implementation
from rclpy.node import Node
from example_interfaces.srv import AddTwoInts, SetBool
from std_srvs.srv import Empty
class ServiceChainNode(Node):
def __init__(self):
super().__init__('service_chain_node')
# Create service chain
self.setup_service_chain()
# Create master service
self.master_service = self.create_service(
Empty, 'master_operation', self.master_operation_callback)
self.get_logger().info('Service chain node initialized')
def setup_service_chain(self):
"""Setup chain of services"""
# Service clients
self.add_client = self.create_client(AddTwoInts, 'add_two_ints')
self.multiply_client = self.create_client(AddTwoInts, 'multiply')
self.validate_client = self.create_client(SetBool, 'validate_result')
# Wait for services
while not all([
self.add_client.wait_for_service(timeout_sec=1.0),
self.multiply_client.wait_for_service(timeout_sec=1.0),
self.validate_client.wait_for_service(timeout_sec=1.0)
]):
self.get_logger().info('Waiting for services...')
async def master_operation_callback(self, request, response):
"""Master service that chains multiple services"""
try:
# Step 1: Add two numbers
add_request = AddTwoInts.Request()
add_request.a = 5
add_request.b = 3
add_future = self.add_client.call_async(add_request)
rclpy.spin_until_future_complete(self, add_future)
add_result = add_future.result()
if add_result.sum is None:
response.success = False
response.message = "Add service failed"
return response
# Step 2: Multiply result by 2
multiply_request = AddTwoInts.Request()
multiply_request.a = add_result.sum
multiply_request.b = 2
multiply_future = self.multiply_client.call_async(multiply_request)
rclpy.spin_until_future_complete(self, multiply_future)
multiply_result = multiply_future.result()
if multiply_result.sum is None:
response.success = False
response.message = "Multiply service failed"
return response
# Step 3: Validate result
validate_request = SetBool.Request()
validate_request.data = multiply_result.sum < 20
validate_future = self.validate_client.call_async(validate_request)
rclpy.spin_until_future_complete(self, validate_future)
validate_result = validate_future.result()
if validate_result.success:
response.success = True
response.message = f"Operation successful: final result = {multiply_result.sum}"
else:
response.success = False
response.message = "Validation failed"
except Exception as e:
response.success = False
response.message = f"Service chain error: {str(e)}"
return response
# Enhanced master service response
from example_interfaces.msg import String
class EnhancedServiceChainNode(ServiceChainNode):
def __init__(self):
super().__init__()
# Create enhanced master service
self.enhanced_service = self.create_service(
String, 'enhanced_operation', self.enhanced_operation_callback)
async def enhanced_operation_callback(self, request, response):
"""Enhanced service with detailed response"""
response.output = ""
try:
# Execute the same chain
base_response = await self.master_operation_callback(None, None)
if base_response.success:
response.output = "Service chain completed successfully:\n"
response.output += f"- Initial addition: 5 + 3 = 8\n"
response.output += f"- Multiplication: 8 * 2 = 16\n"
response.output += f"- Validation: 16 < 20 = True\n"
response.output += f"- Final result: {base_response.message}"
else:
response.output = f"Service chain failed: {base_response.message}"
except Exception as e:
response.output = f"Enhanced service error: {str(e)}"
return response
# Synchronous service chain for better performance
class SyncServiceChainNode(Node):
def __init__(self):
super().__init__('sync_service_chain_node')
# Create synchronized services
self.add_service = self.create_service(
AddTwoInts, 'fast_add', self.add_callback)
self.multiply_service = self.create_service(
AddTwoInts, 'fast_multiply', self.multiply_callback)
# Cache for performance
self.result_cache = {}
self.get_logger().info('Sync service chain node initialized')
def add_callback(self, request, response):
"""Fast addition with caching"""
cache_key = f"add_{request.a}_{request.b}"
if cache_key in self.result_cache:
response.sum = self.result_cache[cache_key]
self.get_logger().debug(f"Cache hit for {cache_key}")
else:
response.sum = request.a + request.b
self.result_cache[cache_key] = response.sum
self.get_logger().debug(f"Cache miss for {cache_key}")
return response
def multiply_callback(self, request, response):
"""Fast multiplication"""
cache_key = f"mult_{request.a}_{request.b}"
if cache_key in self.result_cache:
response.sum = self.result_cache[cache_key]
else:
response.sum = request.a * request.b
self.result_cache[cache_key] = response.sum
return response
5.3.2 Error Handling in Services
Example: Robust Service Implementation
#include <example_interfaces/srv/add_two_ints.hpp>
#include <std_srvs/srv/trigger.hpp>
class RobustServiceNode : public rclcpp::Node {
public:
RobustServiceNode() : Node("robust_service_node") {
// Create robust calculator service
calculator_service_ = create_service<example_interfaces::srv::AddTwoInts>(
"robust_calculator",
std::bind(&RobustServiceNode::calculator_callback, this,
std::placeholders::_1, std::placeholders::_2));
// Create health check service
health_service_ = create_service<std_srvs::srv::Trigger>(
"health_check",
std::bind(&RobustServiceNode::health_check_callback, this,
std::placeholders::_1, std::placeholders::_2));
// Service statistics
request_count_ = 0;
error_count_ = 0;
last_request_time_ = now();
RCLCPP_INFO(get_logger(), "Robust service node initialized");
}
private:
void calculator_callback(
const std::shared_ptr<example_interfaces::srv::AddTwoInts::Request> request,
std::shared_ptr<example_interfaces::srv::AddTwoInts::Response> response) {
request_count_++;
last_request_time_ = now();
try {
// Input validation
if (request->a < 0 || request->a > 1000 ||
request->b < 0 || request->b > 1000) {
response->success = false;
response->message = "Input values must be between 0 and 1000";
error_count_++;
return;
}
// Check for potential overflow
if (request->a > std::numeric_limits<int32_t>::max() - request->b) {
response->success = false;
response->message = "Integer overflow detected";
error_count_++;
return;
}
// Perform calculation
response->sum = request->a + request->b;
response->success = true;
response->message = "Calculation completed successfully";
RCLCPP_INFO(get_logger(),
"Successfully calculated %d + %d = %d",
request->a, request->b, response->sum);
} catch (const std::exception& e) {
RCLCPP_ERROR(get_logger(),
"Exception in calculator service: %s", e.what());
response->success = false;
response->message = std::string("Internal error: ") + e.what();
error_count_++;
}
}
void health_check_callback(
const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
std::shared_ptr<std_srvs::srv::Trigger::Response> response) {
try {
// Check service health
auto time_since_last_request = (now() - last_request_time_).seconds();
response->success = true;
std::ostringstream status;
status << "Service Status:\n";
status << " - Total requests: " << request_count_ << "\n";
status << " - Error count: " << error_count_ << "\n";
status << " - Error rate: "
<< (request_count_ > 0 ?
(100.0 * error_count_ / request_count_) : 0.0) << "%\n";
status << " - Time since last request: "
<< time_since_last_request << " seconds\n";
status << " - Service uptime: "
<< (now().seconds() - start_time_.seconds()) << " seconds";
response->message = status.str();
} catch (const std::exception& e) {
RCLCPP_ERROR(get_logger(),
"Exception in health check: %s", e.what());
response->success = false;
response->message = "Health check failed";
}
}
// Service clients
rclcpp::Service<example_interfaces::srv::AddTwoInts>::SharedPtr calculator_service_;
rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr health_service_;
// Statistics
uint64_t request_count_;
uint64_t error_count_;
rclcpp::Time last_request_time_;
rclcpp::Time start_time_;
};
// Service with timeout and retry mechanisms
class AdvancedServiceNode : public rclcpp::Node {
public:
AdvancedServiceNode() : Node("advanced_service_node") {
// Create service with timeout capability
timeout_service_ = create_service<std_srvs::srv::Trigger>(
"timeout_service",
std::bind(&AdvancedServiceNode::timeout_callback, this,
std::placeholders::_1, std::placeholders::_2));
// Create service with retry mechanism
retry_service_ = create_service<std_srvs::srv::Trigger>(
"retry_service",
std::bind(&AdvancedServiceNode::retry_callback, this,
std::placeholders::_1, std::placeholders::_2));
RCLCPP_INFO(get_logger(), "Advanced service node initialized");
}
private:
void timeout_callback(
const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
std::shared_ptr<std_srvs::srv::Trigger::Response> response) {
// Service that demonstrates timeout handling
auto start_time = now();
try {
// Simulate long operation (5 seconds)
rclcpp::sleep_for(std::chrono::seconds(5));
auto elapsed = (now() - start_time).seconds();
response->success = true;
response->message = "Timeout service completed in " +
std::to_string(elapsed) + " seconds";
} catch (const std::exception& e) {
response->success = false;
response->message = "Timeout service failed: " + std::string(e.what());
}
}
void retry_callback(
const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
std::shared_ptr<std_srvs::srv::Trigger::Response> response) {
// Service that demonstrates retry mechanism
int max_attempts = 3;
int attempt = 0;
bool success = false;
while (attempt < max_attempts && !success) {
attempt++;
try {
// Simulate operation that might fail
double random_value = (double)rand() / RAND_MAX;
if (random_value < 0.7) { // 70% success rate
success = true;
response->success = true;
response->message = "Retry service succeeded after " +
std::to_string(attempt) + " attempts";
break;
} else {
// Simulate failure
if (attempt < max_attempts) {
RCLCPP_INFO(get_logger(),
"Attempt %d failed, retrying...", attempt);
rclcpp::sleep_for(std::chrono::milliseconds(100));
}
}
} catch (const std::exception& e) {
RCLCPP_ERROR(get_logger(),
"Exception in attempt %d: %s", attempt, e.what());
}
}
if (!success) {
response->success = false;
response->message = "Retry service failed after " +
std::to_string(max_attempts) + " attempts";
}
}
rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr timeout_service_;
rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr retry_service_;
};
5.4 Advanced Action Implementation
5.4.1 Complex Action Workflows
Example: Multi-Stage Action Server
from rclpy.action import ActionServer
from rclpy.node import Node
from rclpy.duration import Duration
from example_interfaces.action import Fibonacci
from std_msgs.msg import String
class MultiStageActionServer(Node):
def __init__(self):
super().__init__('multi_stage_action_server')
# Create action server
self.action_server = ActionServer(
self,
Fibonacci,
'fibonacci',
self.execute_callback,
cancel_callback=self.cancel_callback)
# Event publisher for stage tracking
self.stage_pub = self.create_publisher(String, '/action_stage', 10)
# Stage definitions
self.stages = [
"initialization",
"calculation",
"validation",
"finalization"
]
self.current_stage = 0
self.action_active = False
self.get_logger().info('Multi-stage action server initialized')
async def execute_callback(self, goal_handle):
"""Execute multi-stage action"""
self.action_active = True
self.current_stage = 0
# Get goal
order = goal_handle.request.order
self.get_logger().info(f'Executing Fibonacci sequence of order {order}')
# Publish initial stage
self.publish_stage("initialization")
# Stage 1: Initialization
if not await self.stage_initialization(order, goal_handle):
goal_handle.abort()
return Fibonacci.Result()
# Stage 2: Calculation
sequence = await self.stage_calculation(order, goal_handle)
if sequence is None:
goal_handle.abort()
return Fibonacci.Result()
# Stage 3: Validation
if not await self.stage_validation(sequence, goal_handle):
goal_handle.abort()
return Fibonacci.Result()
# Stage 4: Finalization
if not await self.stage_finalization(sequence, goal_handle):
goal_handle.abort()
return Fibonacci.Result()
# Complete action
goal_handle.succeed()
result = Fibonacci.Result()
result.sequence = sequence
self.get_logger().info(f'Action completed: sequence = {sequence}')
self.action_active = False
return result
async def stage_initialization(self, order, goal_handle):
"""Stage 1: Initialize resources"""
self.publish_stage("initialization")
feedback_msg = Fibonacci.Feedback()
feedback_msg.stage = "initialization"
feedback_msg.progress = 0.0
try:
# Validate input
if order < 0 or order > 100:
self.get_logger().error(f'Invalid order: {order}')
return False
# Initialize resources
# Simulate initialization time
for i in range(10):
if goal_handle.is_cancel_requested:
return False
progress = (i + 1) / 10.0 * 25.0 # 0-25% for initialization
feedback_msg.progress = progress
goal_handle.publish_feedback(feedback_msg)
self.get_logger().debug(f'Initialization progress: {progress:.1f}%')
rclpy.spin_once(self, timeout_sec=0.1)
await asyncio.sleep(0.1)
self.get_logger().info('Initialization completed')
return True
except Exception as e:
self.get_logger().error(f'Initialization failed: {e}')
return False
async def stage_calculation(self, order, goal_handle):
"""Stage 2: Calculate Fibonacci sequence"""
self.publish_stage("calculation")
feedback_msg = Fibonacci.Feedback()
feedback_msg.stage = "calculation"
try:
sequence = []
a, b = 0, 1
for i in range(order + 1):
# Check for cancellation
if goal_handle.is_cancel_requested:
return None
# Calculate next number
if i == 0:
sequence.append(a)
else:
sequence.append(b)
a, b = b, a + b
# Update feedback
progress = 25.0 + (i + 1) / (order + 1) * 50.0 # 25-75% for calculation
feedback_msg.partial_sequence = sequence
feedback_msg.progress = progress
goal_handle.publish_feedback(feedback_msg)
self.get_logger().debug(f'Calculation progress: {progress:.1f}%, current value: {b}')
rclpy.spin_once(self, timeout_sec=0.1)
# Add delay to make stage visible
if i % 10 == 0:
await asyncio.sleep(0.1)
self.get_logger().info('Calculation completed')
return sequence
except Exception as e:
self.get_logger().error(f'Calculation failed: {e}')
return None
async def stage_validation(self, sequence, goal_handle):
"""Stage 3: Validate results"""
self.publish_stage("validation")
feedback_msg = Fibonacci.Feedback()
feedback_msg.stage = "validation"
try:
# Validate Fibonacci sequence
for i in range(2, len(sequence)):
expected = sequence[i-1] + sequence[i-2]
if sequence[i] != expected:
self.get_logger().error(f'Validation failed at index {i}: '
f'{sequence[i]} != {expected}')
return False
# Update progress
for i in range(10):
if goal_handle.is_cancel_requested:
return False
progress = 75.0 + (i + 1) / 10.0 * 20.0 # 75-95% for validation
feedback_msg.progress = progress
feedback_msg.partial_sequence = sequence
goal_handle.publish_feedback(feedback_msg)
rclpy.spin_once(self, timeout_sec=0.1)
await asyncio.sleep(0.05)
self.get_logger().info('Validation completed')
return True
except Exception as e:
self.get_logger().error(f'Validation failed: {e}')
return False
async def stage_finalization(self, sequence, goal_handle):
"""Stage 4: Finalize and cleanup"""
self.publish_stage("finalization")
feedback_msg = Fibonacci.Feedback()
feedback_msg.stage = "finalization"
try:
# Finalize results
result_info = {
'sequence_length': len(sequence),
'final_value': sequence[-1] if sequence else 0,
'sum': sum(sequence) if sequence else 0
}
# Update progress to 100%
for i in range(5):
if goal_handle.is_cancel_requested:
return False
progress = 95.0 + (i + 1) / 5.0 * 5.0 # 95-100% for finalization
feedback_msg.progress = progress
feedback_msg.partial_sequence = sequence
goal_handle.publish_feedback(feedback_msg)
rclpy.spin_once(self, timeout_sec=0.1)
await asyncio.sleep(0.05)
self.get_logger().info(f'Finalization completed: {result_info}')
return True
except Exception as e:
self.get_logger().error(f'Finalization failed: {e}')
return False
def cancel_callback(self, goal_handle):
"""Handle action cancellation"""
self.get_logger().info('Received cancel request')
self.action_active = False
return True
def publish_stage(self, stage):
"""Publish current stage information"""
msg = String()
msg.data = f'Current stage: {stage}'
self.stage_pub.publish(msg)
self.current_stage = self.stages.index(stage) if stage in self.stages else 0
# Extended feedback message definition (would be in .msg file)
# example_interfaces/action/FibonacciExtended.action
# int32 order
# ---
# int32[] sequence
# string stage
# float64 progress
# ---
# int32[] sequence
# string status
# float64 execution_time
5.4.2 Action Client with Retry and Timeout
Example: Advanced Action Client
#include <rclcpp_action/rclcpp_action.hpp>
#include <example_interfaces/action/fibonacci.hpp>
#include <chrono>
#include <memory>
class AdvancedActionClient : public rclcpp::Node {
public:
AdvancedActionClient() : Node("advanced_action_client") {
// Create action client
action_client_ = rclcpp_action::create_client<Fibonacci>(
this, "fibonacci");
// Wait for action server
while (!action_client_->wait_for_action_server(std::chrono::seconds(1))) {
if (!rclcpp::ok()) {
RCLCPP_ERROR(get_logger(), "Interrupted while waiting for action server");
return;
}
RCLCPP_INFO(get_logger(), "Waiting for action server...");
}
RCLCPP_INFO(get_logger(), "Advanced action client initialized");
}
void send_goal_with_retry(int order, int max_retries = 3) {
int retry_count = 0;
while (retry_count < max_retries) {
if (send_goal(order)) {
RCLCPP_INFO(get_logger(), "Goal completed successfully");
return;
} else {
retry_count++;
RCLCPP_WARN(get_logger(),
"Goal failed, retrying (%d/%d)...",
retry_count, max_retries);
if (retry_count < max_retries) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
RCLCPP_ERROR(get_logger(), "Goal failed after %d retries", max_retries);
}
void send_goal_with_timeout(int order, double timeout_seconds = 10.0) {
auto goal_msg = Fibonacci::Goal();
goal_msg.order = order;
RCLCPP_INFO(get_logger(), "Sending goal with timeout %.1f seconds", timeout_seconds);
auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
send_goal_options.goal_response_callback =
std::bind(&AdvancedActionClient::goal_response_callback, this, std::placeholders::_1);
send_goal_options.feedback_callback =
std::bind(&AdvancedActionClient::feedback_callback, this, std::placeholders::_1, std::placeholders::_2);
send_goal_options.result_callback =
std::bind(&AdvancedActionClient::result_callback, this, std::placeholders::_1);
// Send goal
auto goal_handle_future = action_client_->async_send_goal(goal_msg, send_goal_options);
// Wait for goal completion with timeout
if (rclcpp::spin_until_future_complete(
this->get_node_base_interface(),
goal_handle_future,
std::chrono::duration<double>(timeout_seconds)) ==
rclcpp::FutureReturnCode::SUCCESS) {
auto goal_handle = goal_handle_future.get();
if (goal_handle) {
RCLCPP_INFO(get_logger(), "Goal completed within timeout");
} else {
RCLCPP_ERROR(get_logger(), "Goal was rejected");
}
} else {
RCLCPP_ERROR(get_logger(), "Goal timed out after %.1f seconds", timeout_seconds);
// Cancel the goal
action_client_->async_cancel_all_goals();
}
}
void send_concurrent_goals() {
RCLCPP_INFO(get_logger(), "Sending multiple concurrent goals");
// Send multiple goals concurrently
std::vector<std::shared_future<rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr>> futures;
for (int i = 0; i < 3; ++i) {
auto goal_msg = Fibonacci::Goal();
goal_msg.order = 10 + i * 2; // 10, 12, 14
auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
send_goal_options.result_callback =
[this, i](const rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult & result) {
switch (result.code) {
case rclcpp_action::ResultCode::SUCCEEDED:
RCLCPP_INFO(get_logger(), "Goal %d succeeded", i);
break;
case rclcpp_action::ResultCode::ABORTED:
RCLCPP_WARN(get_logger(), "Goal %d was aborted", i);
break;
case rclcpp_action::ResultCode::CANCELED:
RCLCPP_INFO(get_logger(), "Goal %d was canceled", i);
break;
default:
RCLCPP_ERROR(get_logger(), "Goal %d failed with unknown result code", i);
break;
}
};
auto future = action_client_->async_send_goal(goal_msg, send_goal_options);
futures.push_back(future);
}
// Wait for all goals to complete
for (auto& future : futures) {
rclcpp::spin_until_future_complete(this->get_node_base_interface(), future);
}
RCLCPP_INFO(get_logger(), "All concurrent goals completed");
}
private:
bool send_goal(int order) {
auto goal_msg = Fibonacci::Goal();
goal_msg.order = order;
RCLCPP_INFO(get_logger(), "Sending goal: order = %d", order);
auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
send_goal_options.goal_response_callback =
std::bind(&AdvancedActionClient::goal_response_callback, this, std::placeholders::_1);
send_goal_options.feedback_callback =
std::bind(&AdvancedActionClient::feedback_callback, this, std::placeholders::_1, std::placeholders::_2);
send_goal_options.result_callback =
std::bind(&AdvancedActionClient::result_callback, this, std::placeholders::_1);
// Send goal and wait for result
auto goal_handle_future = action_client_->async_send_goal(goal_msg, send_goal_options);
if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), goal_handle_future) !=
rclcpp::FutureReturnCode::SUCCESS) {
RCLCPP_ERROR(get_logger(), "Failed to send goal");
return false;
}
auto goal_handle = goal_handle_future.get();
if (!goal_handle) {
RCLCPP_ERROR(get_logger(), "Goal was rejected");
return false;
}
// Wait for result
auto result_future = goal_handle->async_get_result();
if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), result_future) !=
rclcpp::FutureReturnCode::SUCCESS) {
RCLCPP_ERROR(get_logger(), "Failed to get result");
return false;
}
auto result = result_future.get();
return result.code == rclcpp_action::ResultCode::SUCCEEDED;
}
void goal_response_callback(const rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr & goal_handle) {
if (!goal_handle) {
RCLCPP_ERROR(get_logger(), "Goal was rejected");
} else {
RCLCPP_INFO(get_logger(), "Goal accepted by server");
}
}
void feedback_callback(
rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr,
const std::shared_ptr<const Fibonacci::Feedback> feedback) {
RCLCPP_INFO(get_logger(),
"Received feedback: partial_sequence length = %zu",
feedback->partial_sequence.size());
}
void result_callback(const rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult & result) {
switch (result.code) {
case rclcpp_action::ResultCode::SUCCEEDED:
RCLCPP_INFO(get_logger(), "Goal succeeded");
RCLCPP_INFO(get_logger(), "Final sequence:");
for (auto number : result.result->sequence) {
RCLCPP_INFO(get_logger(), " %d", number);
}
break;
case rclcpp_action::ResultCode::ABORTED:
RCLCPP_ERROR(get_logger(), "Goal was aborted");
break;
case rclcpp_action::ResultCode::CANCELED:
RCLCPP_ERROR(get_logger(), "Goal was canceled");
break;
default:
RCLCPP_ERROR(get_logger(), "Unknown result code");
break;
}
}
rclcpp_action::Client<Fibonacci>::SharedPtr action_client_;
};
5.5 Real-world Communication Patterns
5.5.1 Sensor Processing Pipeline
Diagram: Sensor Processing Architecture
Raw Sensor Data
↓
┌─────────────────┐
│ Sensor Driver │
└─────────────────┘
↓
┌─────────────────┐
│ Preprocessing │
│ - Noise Filter │
│ - Calibration │
│ - Validation │
└─────────────────┘
↓
┌─────────────────┐
│ Feature │
│ Extraction │
│ - Detection │
│ - Recognition │
│ - Tracking │
└─────────────────┘
↓
┌─────────────────┐
│ Sensor Fusion │
│ - Kalman Filter │
│ - Weighting │
│ - Validation │
└─────────────────┘
↓
┌─────────────────┐
│ Publishing │
└─────────────────┘
Example: Sensor Processing Pipeline
from rclpy.node import Node
from sensor_msgs.msg import Image, LaserScan
from geometry_msgs.msg import PointStamped
from nav_msgs.msg import Odometry
import numpy as np
class SensorProcessingPipeline(Node):
def __init__(self):
super().__init__('sensor_processing_pipeline')
# Raw sensor subscribers
self.image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10)
self.lidar_sub = self.create_subscription(
LaserScan, '/lidar/scan', self.lidar_callback, 10)
self.odom_sub = self.create_subscription(
Odometry, '/odom', self.odom_callback, 10)
# Processed data publishers
self.obstacle_pub = self.create_publisher(
PointStamped, '/obstacles', 10)
self.trajectory_pub = self.create_publisher(
PointStamped, '/trajectory', 10)
# Processing state
self.latest_image = None
self.latest_scan = None
latest_odom = None
# Processing parameters
self.obstacle_threshold = 2.0 # meters
self.publish_rate = 10.0 # Hz
# Processing timer
self.processing_timer = self.create_timer(
1.0 / self.publish_rate, self.process_sensor_data)
self.get_logger().info('Sensor processing pipeline initialized')
def image_callback(self, msg):
"""Handle raw image data"""
# Preprocess image
processed_image = self.preprocess_image(msg)
# Extract features
features = self.extract_image_features(processed_image)
# Store for fusion
self.latest_image = {
'header': msg.header,
'features': features,
'timestamp': self.get_clock().now()
}
def lidar_callback(self, msg):
"""Handle raw LiDAR data"""
# Preprocess scan
processed_scan = self.preprocess_lidar(msg)
# Extract obstacles
obstacles = self.extract_obstacles(processed_scan)
# Store for fusion
self.latest_scan = {
'header': msg.header,
'obstacles': obstacles,
'timestamp': self.get_clock().now()
}
def odom_callback(self, msg):
"""Handle odometry data"""
self.latest_odom = {
'header': msg.header,
'pose': msg.pose.pose,
'twist': msg.twist.twist,
'timestamp': self.get_clock().now()
}
def preprocess_image(self, msg):
"""Preprocess raw image"""
# In real implementation, use OpenCV or similar
# Convert to grayscale, apply filters, etc.
return msg
def extract_image_features(self, image_msg):
"""Extract features from preprocessed image"""
# Simulate feature extraction
features = []
# Create test features
for i in range(5):
feature = {
'id': i,
'position': (np.random.rand() * image_msg.width,
np.random.rand() * image_msg.height),
'confidence': np.random.rand(),
'type': 'corner'
}
features.append(feature)
return features
def preprocess_lidar(self, msg):
"""Preprocess raw LiDAR scan"""
# Remove invalid readings
valid_ranges = []
for i, r in enumerate(msg.ranges):
if msg.range_min <= r <= msg.range_max:
valid_ranges.append(r)
else:
valid_ranges.append(float('inf'))
# Smooth the data
smoothed_ranges = self.smooth_ranges(valid_ranges)
return {
'ranges': smoothed_ranges,
'angle_min': msg.angle_min,
'angle_max': msg.angle_max,
'angle_increment': msg.angle_increment
}
def smooth_ranges(self, ranges):
"""Smooth LiDAR range data"""
if len(ranges) < 3:
return ranges
smoothed = []
for i in range(len(ranges)):
if i == 0 or i == len(ranges) - 1:
smoothed.append(ranges[i])
else:
# Simple moving average
avg = (ranges[i-1] + ranges[i] + ranges[i+1]) / 3.0
smoothed.append(avg)
return smoothed
def extract_obstacles(self, scan_data):
"""Extract obstacle positions from LiDAR scan"""
obstacles = []
for i, r in enumerate(scan_data['ranges']):
if r < self.obstacle_threshold:
# Convert polar to Cartesian
angle = scan_data['angle_min'] + i * scan_data['angle_increment']
x = r * np.cos(angle)
y = r * np.sin(angle)
obstacles.append({
'position': (x, y),
'distance': r,
'angle': angle,
'confidence': 1.0 / (1.0 + r) # Closer obstacles more confident
})
return obstacles
def process_sensor_data(self):
"""Main processing function"""
if not self.latest_scan or not self.latest_odom:
return
# Publish detected obstacles
self.publish_obstacles()
# Publish trajectory (example)
self.publish_trajectory()
def publish_obstacles(self):
"""Publish detected obstacles"""
for obstacle in self.latest_scan['obstacles']:
msg = PointStamped()
msg.header = self.latest_scan['header']
msg.header.frame_id = 'base_link'
msg.point.x = obstacle['position'][0]
msg.point.y = obstacle['position'][1]
msg.point.z = 0.0
self.obstacle_pub.publish(msg)
def publish_trajectory(self):
"""Publish trajectory point"""
if self.latest_odom:
msg = PointStamped()
msg.header = self.latest_odom['header']
msg.header.frame_id = 'base_link'
msg.point.x = self.latest_odom['pose'].position.x
msg.point.y = self.latest_odom['pose'].position.y
msg.point.z = self.latest_odom['pose'].position.z
self.trajectory_pub.publish(msg)
# Advanced sensor fusion with Kalman filtering
class AdvancedSensorFusion(SensorProcessingPipeline):
def __init__(self):
super().__init__()
# Kalman filter for sensor fusion
self.kf_state = np.zeros([6, 1]) # [x, y, z, vx, vy, vz]
self.kf_covariance = np.eye(6) * 0.1
self.last_fusion_time = self.get_clock().now()
# Fused data publisher
self.fused_odom_pub = self.create_publisher(
Odometry, '/fused_odom', 10)
def process_sensor_data(self):
"""Advanced processing with sensor fusion"""
super().process_sensor_data()
# Perform sensor fusion
self.sensor_fusion()
def sensor_fusion(self):
"""Perform Kalman filter-based sensor fusion"""
current_time = self.get_clock().now()
dt = (current_time - self.last_fusion_time).nanoseconds / 1e9
if dt > 0:
# Prediction step
self.predict_step(dt)
# Update step with measurements
if self.latest_odom:
self.update_step_with_odom()
# Publish fused odometry
self.publish_fused_odometry()
self.last_fusion_time = current_time
def predict_step(self, dt):
"""Kalman filter prediction step"""
# State transition matrix
F = np.array([
[1, 0, 0, dt, 0, 0], # x
[0, 1, 0, 0, dt, 0], # y
[0, 0, 1, 0, 0, dt], # z
[0, 0, 0, 1, 0, 0], # vx
[0, 0, 0, 0, 1, 0], # vy
[0, 0, 0, 0, 0, 1] # vz
])
# Process noise
Q = np.eye(6) * 0.01
# Predict
self.kf_state = F @ self.kf_state
self.kf_covariance = F @ self.kf_covariance @ F.T + Q
def update_step_with_odom(self):
"""Update step with odometry measurement"""
# Measurement matrix (we only measure position)
H = np.array([
[1, 0, 0, 0, 0, 0],
[0, 1, 0, 0, 0, 0],
[0, 0, 1, 0, 0, 0]
])
# Measurement noise
R = np.eye(3) * 0.1
# Measurement
z = np.array([
[self.latest_odom['pose'].position.x],
[self.latest_odom['pose'].position.y],
[self.latest_odom['pose'].position.z]
])
# Kalman gain
S = H @ self.kf_covariance @ H.T + R
K = self.kf_covariance @ H.T @ np.linalg.inv(S)
# Update
y = z - H @ self.kf_state
self.kf_state = self.kf_state + K @ y
self.kf_covariance = (np.eye(6) - K @ H) @ self.kf_covariance
def publish_fused_odometry(self):
"""Publish fused odometry"""
msg = Odometry()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'odom'
msg.child_frame_id = 'base_link'
# Set position
msg.pose.pose.position.x = self.kf_state[0, 0]
msg.pose.pose.position.y = self.kf_state[1, 0]
msg.pose.pose.position.z = self.kf_state[2, 0]
# Set velocity
msg.twist.twist.linear.x = self.kf_state[3, 0]
msg.twist.twist.linear.y = self.kf_state[4, 0]
msg.twist.twist.linear.z = self.kf_state[5, 0]
self.fused_odom_pub.publish(msg)
Summary
This chapter explored advanced ROS 2 communication patterns and architectural approaches essential for building complex, robust robotic systems. We covered sophisticated node designs, optimized topic patterns, robust service implementations, and complex action workflows.
Key takeaways:
- Node composition and multi-threading improve performance and reduce overhead
- Lifecycle management provides controlled startup and shutdown procedures
- Bandwidth optimization and proper message design are crucial for high-frequency data
- Robust error handling and retry mechanisms ensure system reliability
- Multi-stage actions with detailed feedback enable complex workflows
- Sensor processing pipelines demonstrate real-world integration patterns
- Advanced patterns like sensor fusion with Kalman filtering enhance system capabilities
Exercises
Exercise 5.1: Multi-threaded Node Implementation
Create a multi-threaded node that:
- Processes sensor data in one thread
- Performs planning in another thread
- Executes control in a third thread
- Demonstrates proper thread synchronization
- Measures and reports performance
Exercise 5.2: Optimized Topic Design
Design an optimized communication system:
- Create custom message types for specific use cases
- Implement bandwidth optimization techniques
- Test with different QoS settings
- Measure performance improvements
- Document trade-offs
Exercise 5.3: Robust Service Chain
Implement a service chain with:
- Multiple services with dependencies
- Error handling and recovery
- Retry mechanisms
- Performance monitoring
- Load balancing strategies
Exercise 5.4: Advanced Action Workflow
Create a multi-stage action system:
- Implement custom action interface
- Design multi-stage workflow
- Add detailed feedback mechanism
- Implement cancellation and preemption
- Add timeout and retry logic
Exercise 5.5: Sensor Fusion System
Build a complete sensor fusion system:
- Integrate multiple sensor types
- Implement Kalman filter for fusion
- Handle sensor failures gracefully
- Provide performance metrics
- Validate with ground truth data
Glossary Terms
- Node Composition: Combining multiple logical nodes into a single process
- Multi-threaded Node: Node using multiple threads for concurrent processing
- Lifecycle Management: Controlled startup, activation, deactivation, and shutdown of nodes
- Bandwidth Optimization: Techniques to reduce data transmission requirements
- Service Chain: Sequence of services where output of one becomes input of next
- Action Workflow: Multi-stage process with feedback and intermediate results
- Sensor Fusion: Combining data from multiple sensors for improved perception
- Kalman Filter: Recursive algorithm for estimating system state from noisy measurements
- QoS (Quality of Service): Policies controlling data exchange behavior
- Message Serialization: Process of converting message objects to bytes for transmission