Skip to main content

Chapter 5: Nodes, Topics, Services, and Actions

Introduction

While Chapter 4 introduced the fundamental concepts of ROS 2, this chapter delves deeper into the advanced communication patterns that enable complex robotic systems. We'll explore sophisticated node architectures, efficient topic design patterns, robust service implementations, and complex action workflows. Understanding these patterns is essential for building scalable, maintainable, and performant robotic applications.

The choice between topics, services, and actions fundamentally impacts your system's performance, reliability, and maintainability. Mastering these communication patterns is key to becoming a proficient ROS 2 developer.

5.1 Advanced Node Architectures

5.1.1 Node Composition and Lifecycle

Modern ROS 2 applications often use node composition to reduce computational overhead and improve communication efficiency:

Diagram: Node Composition vs Multiple Nodes

Traditional Multi-Node Architecture:
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│   Sensor    │  │ Processor   │  │Controller  │
│    Node     │  │    Node     │  │    Node     │
│             │  │             │  │             │
│ ┌─────────┐ │  │ ┌─────────┐ │  │ ┌─────────┐ │
│ │Publisher│──┼─→│Subscriber│──┼─→│Subscriber│ │
│ └─────────┘ │  │ └─────────┘ │  │ └─────────┘ │
└─────────────┘  └─────────────┘  └─────────────┘
    ↕               ↕               ↕
Network transport with serialization overhead

Composed Node Architecture:
┌─────────────────────────────────────────────────────────┐
│                  Composed Node                          │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐     │
│  │   Sensor    │  │ Processor   │  │Controller  │     │
│  │Component   │  │Component   │  │Component   │     │
│  └─────────────┘  └─────────────┘  └─────────────┘     │
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              Shared Memory Buffer                │   │
│  │           Zero-Copy Communication              │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

Example: Composed Node Implementation

#include <std_msgs/msg/string.hpp>
#include <sensor_msgs/msg/image.hpp>
#include <geometry_msgs/msg/twist.hpp>

class ComposedRobotNode : public rclcpp::Node {
public:
    ComposedRobotNode() : Node("composed_robot_node") {

        // Create components
        setup_sensor_component();
        setup_processor_component();
        setup_controller_component();

        // Create shared data structures
        shared_data_ = std::make_shared<SharedData>();

        RCLCPP_INFO(this->get_logger(), "Composed robot node initialized");
    }

private:
    struct SharedData {
        sensor_msgs::msg::Image::SharedPtr latest_image;
        geometry_msgs::msg::Twist::SharedPtr control_command;
        bool processing_active = false;
        std::mutex mutex;
    };

    std::shared_ptr<SharedData> shared_data_;

    void setup_sensor_component() {
        // Image subscriber
        image_sub_ = this->create_subscription<sensor_msgs::msg::Image>(
            "/camera/image_raw", 10,
            [this](const sensor_msgs::msg::Image::SharedPtr msg) {
                std::lock_guard<std::mutex> lock(shared_data_->mutex);
                shared_data_->latest_image = msg;
            });

        // Timer for simulated sensor processing
        sensor_timer_ = this->create_wall_timer(
            std::chrono::milliseconds(30),
            [this]() { process_sensor_data(); });
    }

    void setup_processor_component() {
        // Timer for image processing
        processor_timer_ = this->create_wall_timer(
            std::chrono::milliseconds(100),
            [this]() { process_image_data(); });
    }

    void setup_controller_component() {
        // Publisher for control commands
        cmd_vel_pub_ = this->create_publisher<geometry_msgs::msg::Twist>(
            "/cmd_vel", 10);

        // Timer for control loop
        control_timer_ = this->create_wall_timer(
            std::chrono::milliseconds(50),
            [this]() { execute_control_loop(); });
    }

    void process_sensor_data() {
        // Simulate sensor processing
        // In real implementation, this would process raw sensor data
    }

    void process_image_data() {
        std::lock_guard<std::mutex> lock(shared_data_->mutex);

        if (shared_data_->latest_image && !shared_data_->processing_active) {
            shared_data_->processing_active = true;

            // Process image (simulation)
            RCLCPP_INFO(this->get_logger(),
                "Processing image %dx%d",
                shared_data_->latest_image->width,
                shared_data_->latest_image->height);

            // Generate control command based on image processing
            auto cmd = std::make_shared<geometry_msgs::msg::Twist>();
            cmd->linear.x = 0.5;  // Example: move forward
            cmd->angular.z = 0.1;  // Example: slight turn

            shared_data_->control_command = cmd;
            shared_data_->processing_active = false;
        }
    }

    void execute_control_loop() {
        std::lock_guard<std::mutex> lock(shared_data_->mutex);

        if (shared_data_->control_command) {
            cmd_vel_pub_->publish(*shared_data_->control_command);
            shared_data_->control_command.reset();
        }
    }

    // Component members
    rclcpp::Subscription<sensor_msgs::msg::Image>::SharedPtr image_sub_;
    rclcpp::Publisher<geometry_msgs::msg::Twist>::SharedPtr cmd_vel_pub_;
    rclcpp::TimerBase::SharedPtr sensor_timer_;
    rclcpp::TimerBase::SharedPtr processor_timer_;
    rclcpp::TimerBase::SharedPtr control_timer_;
};

int main(int argc, char** argv) {
    rclcpp::init(argc, argv);
    auto node = std::make_shared<ComposedRobotNode>();
    rclcpp::spin(node);
    rclcpp::shutdown();
    return 0;
}

5.1.2 Multi-threaded Node Design

ROS 2 nodes can leverage multiple threads for concurrent processing:

Example: Multi-threaded Node Example

from rclpy.node import Node
from rclpy.executors import MultiThreadedExecutor
import threading
import time
from sensor_msgs.msg import Image
from geometry_msgs.msg import Twist

class MultiThreadedRobotNode(Node):
    def __init__(self):
        super().__init__('multithreaded_robot_node')

        # Create executor for this node
        self.executor = MultiThreadedExecutor()

        # Shared state
        self.image_data = None
        self.processing_lock = threading.Lock()
        self.control_command = None

        # Setup components in separate threads
        self.setup_sensor_thread()
        self.setup_processor_thread()
        self.setup_control_thread()

        self.get_logger().info('Multi-threaded robot node initialized')

    def setup_sensor_thread(self):
        """Setup sensor processing in separate thread"""
        def sensor_worker():
            # Create subscription
            image_sub = self.create_subscription(
                Image, '/camera/image_raw', self.image_callback, 10)

            # Add this node to executor
            self.executor.add_node(self)

            # Spin in thread
            self.executor.spin()

        # Start thread
        self.sensor_thread = threading.Thread(target=sensor_worker, daemon=True)
        self.sensor_thread.start()

    def setup_processor_thread(self):
        """Setup image processing in separate thread"""
        def processor_worker():
            while rclpy.ok():
                time.sleep(0.1)  # 10 Hz processing

                with self.processing_lock:
                    if self.image_data is not None:
                        self.process_image()

        self.processor_thread = threading.Thread(target=processor_worker, daemon=True)
        self.processor_thread.start()

    def setup_control_thread(self):
        """Setup control loop in separate thread"""
        def control_worker():
            # Create publisher
            cmd_vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)

            while rclpy.ok():
                time.sleep(0.05)  # 20 Hz control loop

                with self.processing_lock:
                    if self.control_command is not None:
                        cmd_vel_pub.publish(self.control_command)
                        self.control_command = None

        self.control_thread = threading.Thread(target=control_worker, daemon=True)
        self.control_thread.start()

    def image_callback(self, msg):
        """Handle incoming image messages"""
        with self.processing_lock:
            self.image_data = msg

    def process_image(self):
        """Process latest image data"""
        if self.image_data is None:
            return

        # Simulate image processing
        self.get_logger().info(
            f'Processing image: {self.image_data.width}x{self.image_data.height}')

        # Generate control command
        cmd = Twist()
        cmd.linear.x = 0.5
        cmd.angular.z = 0.1

        self.control_command = cmd
        self.image_data = None

def main():
    rclpy.init()
    node = MultiThreadedRobotNode()

    try:
        # Keep main thread alive
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        rclpy.shutdown()

if __name__ == '__main__':
    main()

5.1.3 Node Lifecycle Management

ROS 2 provides a lifecycle management system for controlled startup and shutdown:

Example: Lifecycle Node Implementation

#include <lifecycle_msgs/msg/state.hpp>
#include <lifecycle_msgs/msg/transition.hpp>

class LifecycleRobotNode : public rclcpp_lifecycle::LifecycleNode {
public:
    explicit LifecycleRobotNode(const std::string& node_name)
        : LifecycleNode(node_name) {}

    using CallbackReturn =
        rclcpp_lifecycle::node_interfaces::LifecycleNodeInterface::CallbackReturn;

    // Lifecycle state transitions
    CallbackReturn on_configure(const rclcpp_lifecycle::State&) {
        // Configure hardware and resources
        configure_hardware();
        setup_subscribers();
        setup_publishers();

        RCLCPP_INFO(get_logger(), "Node configured successfully");
        return CallbackReturn::SUCCESS;
    }

    CallbackReturn on_activate(const rclcpp_lifecycle::State&) {
        // Activate publishers and start processing
        activate_publishers();
        start_processing();

        RCLCPP_INFO(get_logger(), "Node activated successfully");
        return CallbackReturn::SUCCESS;
    }

    CallbackReturn on_deactivate(const rclcpp_lifecycle::State&) {
        // Deactivate publishers and stop processing
        stop_processing();
        deactivate_publishers();

        RCLCPP_INFO(get_logger(), "Node deactivated successfully");
        return CallbackReturn::SUCCESS;
    }

    CallbackReturn on_cleanup(const rclcpp_lifecycle::State&) {
        // Cleanup resources
        cleanup_hardware();

        RCLCPP_INFO(get_logger(), "Node cleaned up successfully");
        return CallbackReturn::SUCCESS;
    }

    CallbackReturn on_shutdown(const rclcpp_lifecycle::State&) {
        // Final shutdown operations
        RCLCPP_INFO(get_logger(), "Node shutting down successfully");
        return CallbackReturn::SUCCESS;
    }

private:
    void configure_hardware() {
        // Initialize hardware components
        RCLCPP_INFO(get_logger(), "Configuring hardware...");
    }

    void setup_subscribers() {
        // Setup subscribers
        RCLCPP_INFO(get_logger(), "Setting up subscribers...");
    }

    void setup_publishers() {
        // Setup publishers (but don't activate yet)
        cmd_vel_pub_ = create_publisher<geometry_msgs::msg::Twist>(
            "/cmd_vel", rclcpp::SystemDefaultsQoS());
        RCLCPP_INFO(get_logger(), "Setting up publishers...");
    }

    void activate_publishers() {
        // Activate publishers
        if (cmd_vel_pub_) {
            cmd_vel_pub_->on_activate();
        }
        RCLCPP_INFO(get_logger(), "Activating publishers...");
    }

    void deactivate_publishers() {
        // Deactivate publishers
        if (cmd_vel_pub_) {
            cmd_vel_pub_->on_deactivate();
        }
        RCLCPP_INFO(get_logger(), "Deactivating publishers...");
    }

    void start_processing() {
        // Start main processing loop
        processing_active_ = true;
        RCLCPP_INFO(get_logger(), "Starting processing...");
    }

    void stop_processing() {
        // Stop main processing loop
        processing_active_ = false;
        RCLCPP_INFO(get_logger(), "Stopping processing...");
    }

    void cleanup_hardware() {
        // Cleanup hardware resources
        RCLCPP_INFO(get_logger(), "Cleaning up hardware...");
    }

    rclcpp_lifecycle::LifecyclePublisher<geometry_msgs::msg::Twist>::SharedPtr cmd_vel_pub_;
    bool processing_active_ = false;
};

5.2 Advanced Topic Patterns

5.2.1 Bandwidth Optimization

For high-frequency data streams, optimization is crucial:

Example: Optimized Topic Publisher

from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
from std_msgs.msg import Header

class OptimizedImagePublisher(Node):
    def __init__(self):
        super().__init__('optimized_image_publisher')

        # Publisher with optimized QoS
        self.image_pub = self.create_publisher(
            Image, '/camera/image_compressed', 10)

        # Timer for high-frequency publishing
        self.timer = self.create_timer(1.0/30.0, self.publish_image)  # 30 Hz

        # Image data
        self.image_width = 640
        self.image_height = 480
        self.frame_id = 0

        # Compression settings
        self.compression_quality = 80
        self.publish_compressed = True

        self.get_logger().info('Optimized image publisher started')

    def generate_test_image(self):
        """Generate test image data"""
        # Create numpy array representing image
        image_data = np.random.randint(
            0, 256,
            (self.image_height, self.image_width, 3),
            dtype=np.uint8
        )
        return image_data

    def compress_image(self, image_data):
        """Compress image to reduce bandwidth"""
        # In real implementation, use image compression library
        # like OpenCV: cv2.imencode('.jpg', image_data, params)
        return image_data.tobytes()  # Simplified

    def publish_image(self):
        """Publish optimized image"""
        # Generate test image
        image_data = self.generate_test_image()

        # Create message
        msg = Image()
        msg.header = Header()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = f"camera_frame_{self.frame_id}"
        msg.height = self.image_height
        msg.width = self.image_width
        msg.encoding = "rgb8"
        msg.is_bigendian = False
        msg.step = self.image_width * 3  # bytes per row

        # Optimize based on settings
        if self.publish_compressed:
            # Compressed image (simplified)
            compressed_data = self.compress_image(image_data)
            msg.data = compressed_data
        else:
            # Raw image
            msg.data = image_data.tobytes()

        # Publish
        self.image_pub.publish(msg)
        self.frame_id += 1

        # Log bandwidth usage
        data_size = len(msg.data)
        self.get_logger().debug(
            f'Published frame {self.frame_id}, size: {data_size} bytes',
            throttle_duration_sec=1.0
        )

# Advanced optimization with selective publishing
class SmartImagePublisher(OptimizedImagePublisher):
    def __init__(self):
        super().__init__()

        # Previous image for difference calculation
        self.previous_image = None
        self.threshold = 0.1  # 10% change threshold
        self.frame_skip = 2  # Publish every 2nd frame
        self.frame_counter = 0

    def calculate_difference(self, image1, image2):
        """Calculate percentage difference between images"""
        if image1 is None or image2 is None:
            return 1.0

        # Calculate absolute difference
        diff = np.abs(image1.astype(float) - image2.astype(float))
        return np.mean(diff) / 255.0

    def publish_image(self):
        """Publish image only if significant change"""
        self.frame_counter += 1

        # Skip frames based on frame_skip setting
        if self.frame_counter % self.frame_skip != 0:
            return

        # Generate current image
        current_image = self.generate_test_image()

        # Calculate difference with previous image
        diff = self.calculate_difference(self.previous_image, current_image)

        # Only publish if significant change
        if diff > self.threshold or self.previous_image is None:
            super().publish_image()
            self.previous_image = current_image.copy()
            self.get_logger().debug(
                f'Published frame due to {diff:.1%} change')
        else:
            self.get_logger().debug(
                f'Skipped frame, only {diff:.1%} change')

5.2.2 Message Type Design

Effective message type design is crucial for performance and maintainability:

Diagram: Message Type Hierarchy

Base Message
├── Header (timestamp, frame_id)
├── Core Data Fields
├── Metadata Fields
└── Optional Extensions

Good Design:
robot_control_msg/Command
├── header (std_msgs/Header)
├── command_id (uint32)
├── command_type (uint8)
├── target_pose (geometry_msgs/PoseStamped)
├── velocity (geometry_msgs/Twist)
├── gripper_command (GripperCommand)
└── parameters (CustomParameters)

Avoid: Large monolithic messages with many optional fields

Example: Custom Message Definition

std_msgs/Header header
uint32 command_id
uint8 type=0
uint8 STOP=0
uint8 MOVE=1
uint8 GRASP=2
uint8 RELEASE=3

# Required fields based on command type
geometry_msgs/PoseStamped target_pose
geometry_msgs/Twist velocity
GripperCommand gripper_command

# Additional parameters
CustomParameters[] parameters

# robot_control_msg/GripperCommand.msg
float64 position  # 0.0 (open) to 1.0 (closed)
float64 force     # Maximum closing force
bool relative     # True for relative positioning

# robot_control_msg/CustomParameters.msg
string key
string value

5.2.3 Topic Naming Conventions

Follow ROS 2 topic naming conventions: use descriptive names, include hierarchy, and follow lower_case_with_underscores naming pattern.

Example: Topic Naming Examples

/camera/image_raw
/camera/image_rect_color
/camera/camera_info
/robot_base/joint_states
/robot_base/odom
/arm_controller/command
/arm_controller/state
/gripper_controller/command
/mobile_base/cmd_vel
/mobile_base/odom
/mobile_base/joint_states

# Multi-robot systems
/robot_1/camera/image_raw
/robot_1/mobile_base/cmd_vel
/robot_2/camera/image_raw
/robot_2/mobile_base/cmd_vel

# Sensor specific naming
/front_lidar/scan
/rear_lidar/scan
/left_camera/image_raw
/right_camera/image_raw
/imu/data
/gps/fix

5.3 Service Design Patterns

5.3.1 Service Chain Patterns

Services can be chained together for complex operations:

Example: Service Chain Implementation

from rclpy.node import Node
from example_interfaces.srv import AddTwoInts, SetBool
from std_srvs.srv import Empty

class ServiceChainNode(Node):
    def __init__(self):
        super().__init__('service_chain_node')

        # Create service chain
        self.setup_service_chain()

        # Create master service
        self.master_service = self.create_service(
            Empty, 'master_operation', self.master_operation_callback)

        self.get_logger().info('Service chain node initialized')

    def setup_service_chain(self):
        """Setup chain of services"""
        # Service clients
        self.add_client = self.create_client(AddTwoInts, 'add_two_ints')
        self.multiply_client = self.create_client(AddTwoInts, 'multiply')
        self.validate_client = self.create_client(SetBool, 'validate_result')

        # Wait for services
        while not all([
            self.add_client.wait_for_service(timeout_sec=1.0),
            self.multiply_client.wait_for_service(timeout_sec=1.0),
            self.validate_client.wait_for_service(timeout_sec=1.0)
        ]):
            self.get_logger().info('Waiting for services...')

    async def master_operation_callback(self, request, response):
        """Master service that chains multiple services"""
        try:
            # Step 1: Add two numbers
            add_request = AddTwoInts.Request()
            add_request.a = 5
            add_request.b = 3

            add_future = self.add_client.call_async(add_request)
            rclpy.spin_until_future_complete(self, add_future)
            add_result = add_future.result()

            if add_result.sum is None:
                response.success = False
                response.message = "Add service failed"
                return response

            # Step 2: Multiply result by 2
            multiply_request = AddTwoInts.Request()
            multiply_request.a = add_result.sum
            multiply_request.b = 2

            multiply_future = self.multiply_client.call_async(multiply_request)
            rclpy.spin_until_future_complete(self, multiply_future)
            multiply_result = multiply_future.result()

            if multiply_result.sum is None:
                response.success = False
                response.message = "Multiply service failed"
                return response

            # Step 3: Validate result
            validate_request = SetBool.Request()
            validate_request.data = multiply_result.sum < 20

            validate_future = self.validate_client.call_async(validate_request)
            rclpy.spin_until_future_complete(self, validate_future)
            validate_result = validate_future.result()

            if validate_result.success:
                response.success = True
                response.message = f"Operation successful: final result = {multiply_result.sum}"
            else:
                response.success = False
                response.message = "Validation failed"

        except Exception as e:
            response.success = False
            response.message = f"Service chain error: {str(e)}"

        return response

# Enhanced master service response
from example_interfaces.msg import String

class EnhancedServiceChainNode(ServiceChainNode):
    def __init__(self):
        super().__init__()

        # Create enhanced master service
        self.enhanced_service = self.create_service(
            String, 'enhanced_operation', self.enhanced_operation_callback)

    async def enhanced_operation_callback(self, request, response):
        """Enhanced service with detailed response"""
        response.output = ""

        try:
            # Execute the same chain
            base_response = await self.master_operation_callback(None, None)

            if base_response.success:
                response.output = "Service chain completed successfully:\n"
                response.output += f"- Initial addition: 5 + 3 = 8\n"
                response.output += f"- Multiplication: 8 * 2 = 16\n"
                response.output += f"- Validation: 16 < 20 = True\n"
                response.output += f"- Final result: {base_response.message}"
            else:
                response.output = f"Service chain failed: {base_response.message}"

        except Exception as e:
            response.output = f"Enhanced service error: {str(e)}"

        return response

# Synchronous service chain for better performance
class SyncServiceChainNode(Node):
    def __init__(self):
        super().__init__('sync_service_chain_node')

        # Create synchronized services
        self.add_service = self.create_service(
            AddTwoInts, 'fast_add', self.add_callback)
        self.multiply_service = self.create_service(
            AddTwoInts, 'fast_multiply', self.multiply_callback)

        # Cache for performance
        self.result_cache = {}

        self.get_logger().info('Sync service chain node initialized')

    def add_callback(self, request, response):
        """Fast addition with caching"""
        cache_key = f"add_{request.a}_{request.b}"

        if cache_key in self.result_cache:
            response.sum = self.result_cache[cache_key]
            self.get_logger().debug(f"Cache hit for {cache_key}")
        else:
            response.sum = request.a + request.b
            self.result_cache[cache_key] = response.sum
            self.get_logger().debug(f"Cache miss for {cache_key}")

        return response

    def multiply_callback(self, request, response):
        """Fast multiplication"""
        cache_key = f"mult_{request.a}_{request.b}"

        if cache_key in self.result_cache:
            response.sum = self.result_cache[cache_key]
        else:
            response.sum = request.a * request.b
            self.result_cache[cache_key] = response.sum

        return response

5.3.2 Error Handling in Services

Example: Robust Service Implementation

#include <example_interfaces/srv/add_two_ints.hpp>
#include <std_srvs/srv/trigger.hpp>

class RobustServiceNode : public rclcpp::Node {
public:
    RobustServiceNode() : Node("robust_service_node") {

        // Create robust calculator service
        calculator_service_ = create_service<example_interfaces::srv::AddTwoInts>(
            "robust_calculator",
            std::bind(&RobustServiceNode::calculator_callback, this,
                      std::placeholders::_1, std::placeholders::_2));

        // Create health check service
        health_service_ = create_service<std_srvs::srv::Trigger>(
            "health_check",
            std::bind(&RobustServiceNode::health_check_callback, this,
                      std::placeholders::_1, std::placeholders::_2));

        // Service statistics
        request_count_ = 0;
        error_count_ = 0;
        last_request_time_ = now();

        RCLCPP_INFO(get_logger(), "Robust service node initialized");
    }

private:
    void calculator_callback(
        const std::shared_ptr<example_interfaces::srv::AddTwoInts::Request> request,
        std::shared_ptr<example_interfaces::srv::AddTwoInts::Response> response) {

        request_count_++;
        last_request_time_ = now();

        try {
            // Input validation
            if (request->a < 0 || request->a > 1000 ||
                request->b < 0 || request->b > 1000) {
                response->success = false;
                response->message = "Input values must be between 0 and 1000";
                error_count_++;
                return;
            }

            // Check for potential overflow
            if (request->a > std::numeric_limits<int32_t>::max() - request->b) {
                response->success = false;
                response->message = "Integer overflow detected";
                error_count_++;
                return;
            }

            // Perform calculation
            response->sum = request->a + request->b;
            response->success = true;
            response->message = "Calculation completed successfully";

            RCLCPP_INFO(get_logger(),
                       "Successfully calculated %d + %d = %d",
                       request->a, request->b, response->sum);

        } catch (const std::exception& e) {
            RCLCPP_ERROR(get_logger(),
                        "Exception in calculator service: %s", e.what());
            response->success = false;
            response->message = std::string("Internal error: ") + e.what();
            error_count_++;
        }
    }

    void health_check_callback(
        const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
        std::shared_ptr<std_srvs::srv::Trigger::Response> response) {

        try {
            // Check service health
            auto time_since_last_request = (now() - last_request_time_).seconds();

            response->success = true;

            std::ostringstream status;
            status << "Service Status:\n";
            status << "  - Total requests: " << request_count_ << "\n";
            status << "  - Error count: " << error_count_ << "\n";
            status << "  - Error rate: "
                   << (request_count_ > 0 ?
                       (100.0 * error_count_ / request_count_) : 0.0) << "%\n";
            status << "  - Time since last request: "
                   << time_since_last_request << " seconds\n";
            status << "  - Service uptime: "
                   << (now().seconds() - start_time_.seconds()) << " seconds";

            response->message = status.str();

        } catch (const std::exception& e) {
            RCLCPP_ERROR(get_logger(),
                        "Exception in health check: %s", e.what());
            response->success = false;
            response->message = "Health check failed";
        }
    }

    // Service clients
    rclcpp::Service<example_interfaces::srv::AddTwoInts>::SharedPtr calculator_service_;
    rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr health_service_;

    // Statistics
    uint64_t request_count_;
    uint64_t error_count_;
    rclcpp::Time last_request_time_;
    rclcpp::Time start_time_;
};

// Service with timeout and retry mechanisms
class AdvancedServiceNode : public rclcpp::Node {
public:
    AdvancedServiceNode() : Node("advanced_service_node") {

        // Create service with timeout capability
        timeout_service_ = create_service<std_srvs::srv::Trigger>(
            "timeout_service",
            std::bind(&AdvancedServiceNode::timeout_callback, this,
                      std::placeholders::_1, std::placeholders::_2));

        // Create service with retry mechanism
        retry_service_ = create_service<std_srvs::srv::Trigger>(
            "retry_service",
            std::bind(&AdvancedServiceNode::retry_callback, this,
                      std::placeholders::_1, std::placeholders::_2));

        RCLCPP_INFO(get_logger(), "Advanced service node initialized");
    }

private:
    void timeout_callback(
        const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
        std::shared_ptr<std_srvs::srv::Trigger::Response> response) {

        // Service that demonstrates timeout handling
        auto start_time = now();

        try {
            // Simulate long operation (5 seconds)
            rclcpp::sleep_for(std::chrono::seconds(5));

            auto elapsed = (now() - start_time).seconds();

            response->success = true;
            response->message = "Timeout service completed in " +
                               std::to_string(elapsed) + " seconds";

        } catch (const std::exception& e) {
            response->success = false;
            response->message = "Timeout service failed: " + std::string(e.what());
        }
    }

    void retry_callback(
        const std::shared_ptr<std_srvs::srv::Trigger::Request> request,
        std::shared_ptr<std_srvs::srv::Trigger::Response> response) {

        // Service that demonstrates retry mechanism
        int max_attempts = 3;
        int attempt = 0;
        bool success = false;

        while (attempt < max_attempts && !success) {
            attempt++;

            try {
                // Simulate operation that might fail
                double random_value = (double)rand() / RAND_MAX;

                if (random_value < 0.7) {  // 70% success rate
                    success = true;
                    response->success = true;
                    response->message = "Retry service succeeded after " +
                                       std::to_string(attempt) + " attempts";
                    break;
                } else {
                    // Simulate failure
                    if (attempt < max_attempts) {
                        RCLCPP_INFO(get_logger(),
                                   "Attempt %d failed, retrying...", attempt);
                        rclcpp::sleep_for(std::chrono::milliseconds(100));
                    }
                }

            } catch (const std::exception& e) {
                RCLCPP_ERROR(get_logger(),
                            "Exception in attempt %d: %s", attempt, e.what());
            }
        }

        if (!success) {
            response->success = false;
            response->message = "Retry service failed after " +
                               std::to_string(max_attempts) + " attempts";
        }
    }

    rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr timeout_service_;
    rclcpp::Service<std_srvs::srv::Trigger>::SharedPtr retry_service_;
};

5.4 Advanced Action Implementation

5.4.1 Complex Action Workflows

Example: Multi-Stage Action Server

from rclpy.action import ActionServer
from rclpy.node import Node
from rclpy.duration import Duration

from example_interfaces.action import Fibonacci
from std_msgs.msg import String

class MultiStageActionServer(Node):
    def __init__(self):
        super().__init__('multi_stage_action_server')

        # Create action server
        self.action_server = ActionServer(
            self,
            Fibonacci,
            'fibonacci',
            self.execute_callback,
            cancel_callback=self.cancel_callback)

        # Event publisher for stage tracking
        self.stage_pub = self.create_publisher(String, '/action_stage', 10)

        # Stage definitions
        self.stages = [
            "initialization",
            "calculation",
            "validation",
            "finalization"
        ]

        self.current_stage = 0
        self.action_active = False

        self.get_logger().info('Multi-stage action server initialized')

    async def execute_callback(self, goal_handle):
        """Execute multi-stage action"""
        self.action_active = True
        self.current_stage = 0

        # Get goal
        order = goal_handle.request.order

        self.get_logger().info(f'Executing Fibonacci sequence of order {order}')

        # Publish initial stage
        self.publish_stage("initialization")

        # Stage 1: Initialization
        if not await self.stage_initialization(order, goal_handle):
            goal_handle.abort()
            return Fibonacci.Result()

        # Stage 2: Calculation
        sequence = await self.stage_calculation(order, goal_handle)
        if sequence is None:
            goal_handle.abort()
            return Fibonacci.Result()

        # Stage 3: Validation
        if not await self.stage_validation(sequence, goal_handle):
            goal_handle.abort()
            return Fibonacci.Result()

        # Stage 4: Finalization
        if not await self.stage_finalization(sequence, goal_handle):
            goal_handle.abort()
            return Fibonacci.Result()

        # Complete action
        goal_handle.succeed()

        result = Fibonacci.Result()
        result.sequence = sequence

        self.get_logger().info(f'Action completed: sequence = {sequence}')

        self.action_active = False
        return result

    async def stage_initialization(self, order, goal_handle):
        """Stage 1: Initialize resources"""
        self.publish_stage("initialization")

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.stage = "initialization"
        feedback_msg.progress = 0.0

        try:
            # Validate input
            if order < 0 or order > 100:
                self.get_logger().error(f'Invalid order: {order}')
                return False

            # Initialize resources
            # Simulate initialization time
            for i in range(10):
                if goal_handle.is_cancel_requested:
                    return False

                progress = (i + 1) / 10.0 * 25.0  # 0-25% for initialization
                feedback_msg.progress = progress
                goal_handle.publish_feedback(feedback_msg)

                self.get_logger().debug(f'Initialization progress: {progress:.1f}%')
                rclpy.spin_once(self, timeout_sec=0.1)
                await asyncio.sleep(0.1)

            self.get_logger().info('Initialization completed')
            return True

        except Exception as e:
            self.get_logger().error(f'Initialization failed: {e}')
            return False

    async def stage_calculation(self, order, goal_handle):
        """Stage 2: Calculate Fibonacci sequence"""
        self.publish_stage("calculation")

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.stage = "calculation"

        try:
            sequence = []
            a, b = 0, 1

            for i in range(order + 1):
                # Check for cancellation
                if goal_handle.is_cancel_requested:
                    return None

                # Calculate next number
                if i == 0:
                    sequence.append(a)
                else:
                    sequence.append(b)
                    a, b = b, a + b

                # Update feedback
                progress = 25.0 + (i + 1) / (order + 1) * 50.0  # 25-75% for calculation
                feedback_msg.partial_sequence = sequence
                feedback_msg.progress = progress
                goal_handle.publish_feedback(feedback_msg)

                self.get_logger().debug(f'Calculation progress: {progress:.1f}%, current value: {b}')
                rclpy.spin_once(self, timeout_sec=0.1)

                # Add delay to make stage visible
                if i % 10 == 0:
                    await asyncio.sleep(0.1)

            self.get_logger().info('Calculation completed')
            return sequence

        except Exception as e:
            self.get_logger().error(f'Calculation failed: {e}')
            return None

    async def stage_validation(self, sequence, goal_handle):
        """Stage 3: Validate results"""
        self.publish_stage("validation")

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.stage = "validation"

        try:
            # Validate Fibonacci sequence
            for i in range(2, len(sequence)):
                expected = sequence[i-1] + sequence[i-2]
                if sequence[i] != expected:
                    self.get_logger().error(f'Validation failed at index {i}: '
                                           f'{sequence[i]} != {expected}')
                    return False

            # Update progress
            for i in range(10):
                if goal_handle.is_cancel_requested:
                    return False

                progress = 75.0 + (i + 1) / 10.0 * 20.0  # 75-95% for validation
                feedback_msg.progress = progress
                feedback_msg.partial_sequence = sequence
                goal_handle.publish_feedback(feedback_msg)

                rclpy.spin_once(self, timeout_sec=0.1)
                await asyncio.sleep(0.05)

            self.get_logger().info('Validation completed')
            return True

        except Exception as e:
            self.get_logger().error(f'Validation failed: {e}')
            return False

    async def stage_finalization(self, sequence, goal_handle):
        """Stage 4: Finalize and cleanup"""
        self.publish_stage("finalization")

        feedback_msg = Fibonacci.Feedback()
        feedback_msg.stage = "finalization"

        try:
            # Finalize results
            result_info = {
                'sequence_length': len(sequence),
                'final_value': sequence[-1] if sequence else 0,
                'sum': sum(sequence) if sequence else 0
            }

            # Update progress to 100%
            for i in range(5):
                if goal_handle.is_cancel_requested:
                    return False

                progress = 95.0 + (i + 1) / 5.0 * 5.0  # 95-100% for finalization
                feedback_msg.progress = progress
                feedback_msg.partial_sequence = sequence
                goal_handle.publish_feedback(feedback_msg)

                rclpy.spin_once(self, timeout_sec=0.1)
                await asyncio.sleep(0.05)

            self.get_logger().info(f'Finalization completed: {result_info}')
            return True

        except Exception as e:
            self.get_logger().error(f'Finalization failed: {e}')
            return False

    def cancel_callback(self, goal_handle):
        """Handle action cancellation"""
        self.get_logger().info('Received cancel request')
        self.action_active = False
        return True

    def publish_stage(self, stage):
        """Publish current stage information"""
        msg = String()
        msg.data = f'Current stage: {stage}'
        self.stage_pub.publish(msg)
        self.current_stage = self.stages.index(stage) if stage in self.stages else 0

# Extended feedback message definition (would be in .msg file)
# example_interfaces/action/FibonacciExtended.action
# int32 order
# ---
# int32[] sequence
# string stage
# float64 progress
# ---
# int32[] sequence
# string status
# float64 execution_time

5.4.2 Action Client with Retry and Timeout

Example: Advanced Action Client

#include <rclcpp_action/rclcpp_action.hpp>
#include <example_interfaces/action/fibonacci.hpp>
#include <chrono>
#include <memory>

class AdvancedActionClient : public rclcpp::Node {
public:
    AdvancedActionClient() : Node("advanced_action_client") {

        // Create action client
        action_client_ = rclcpp_action::create_client<Fibonacci>(
            this, "fibonacci");

        // Wait for action server
        while (!action_client_->wait_for_action_server(std::chrono::seconds(1))) {
            if (!rclcpp::ok()) {
                RCLCPP_ERROR(get_logger(), "Interrupted while waiting for action server");
                return;
            }
            RCLCPP_INFO(get_logger(), "Waiting for action server...");
        }

        RCLCPP_INFO(get_logger(), "Advanced action client initialized");
    }

    void send_goal_with_retry(int order, int max_retries = 3) {
        int retry_count = 0;

        while (retry_count < max_retries) {
            if (send_goal(order)) {
                RCLCPP_INFO(get_logger(), "Goal completed successfully");
                return;
            } else {
                retry_count++;
                RCLCPP_WARN(get_logger(),
                           "Goal failed, retrying (%d/%d)...",
                           retry_count, max_retries);

                if (retry_count < max_retries) {
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                }
            }
        }

        RCLCPP_ERROR(get_logger(), "Goal failed after %d retries", max_retries);
    }

    void send_goal_with_timeout(int order, double timeout_seconds = 10.0) {
        auto goal_msg = Fibonacci::Goal();
        goal_msg.order = order;

        RCLCPP_INFO(get_logger(), "Sending goal with timeout %.1f seconds", timeout_seconds);

        auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
        send_goal_options.goal_response_callback =
            std::bind(&AdvancedActionClient::goal_response_callback, this, std::placeholders::_1);
        send_goal_options.feedback_callback =
            std::bind(&AdvancedActionClient::feedback_callback, this, std::placeholders::_1, std::placeholders::_2);
        send_goal_options.result_callback =
            std::bind(&AdvancedActionClient::result_callback, this, std::placeholders::_1);

        // Send goal
        auto goal_handle_future = action_client_->async_send_goal(goal_msg, send_goal_options);

        // Wait for goal completion with timeout
        if (rclcpp::spin_until_future_complete(
                this->get_node_base_interface(),
                goal_handle_future,
                std::chrono::duration<double>(timeout_seconds)) ==
            rclcpp::FutureReturnCode::SUCCESS) {

            auto goal_handle = goal_handle_future.get();
            if (goal_handle) {
                RCLCPP_INFO(get_logger(), "Goal completed within timeout");
            } else {
                RCLCPP_ERROR(get_logger(), "Goal was rejected");
            }
        } else {
            RCLCPP_ERROR(get_logger(), "Goal timed out after %.1f seconds", timeout_seconds);
            // Cancel the goal
            action_client_->async_cancel_all_goals();
        }
    }

    void send_concurrent_goals() {
        RCLCPP_INFO(get_logger(), "Sending multiple concurrent goals");

        // Send multiple goals concurrently
        std::vector<std::shared_future<rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr>> futures;

        for (int i = 0; i < 3; ++i) {
            auto goal_msg = Fibonacci::Goal();
            goal_msg.order = 10 + i * 2;  // 10, 12, 14

            auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
            send_goal_options.result_callback =
                [this, i](const rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult & result) {
                    switch (result.code) {
                        case rclcpp_action::ResultCode::SUCCEEDED:
                            RCLCPP_INFO(get_logger(), "Goal %d succeeded", i);
                            break;
                        case rclcpp_action::ResultCode::ABORTED:
                            RCLCPP_WARN(get_logger(), "Goal %d was aborted", i);
                            break;
                        case rclcpp_action::ResultCode::CANCELED:
                            RCLCPP_INFO(get_logger(), "Goal %d was canceled", i);
                            break;
                        default:
                            RCLCPP_ERROR(get_logger(), "Goal %d failed with unknown result code", i);
                            break;
                    }
                };

            auto future = action_client_->async_send_goal(goal_msg, send_goal_options);
            futures.push_back(future);
        }

        // Wait for all goals to complete
        for (auto& future : futures) {
            rclcpp::spin_until_future_complete(this->get_node_base_interface(), future);
        }

        RCLCPP_INFO(get_logger(), "All concurrent goals completed");
    }

private:
    bool send_goal(int order) {
        auto goal_msg = Fibonacci::Goal();
        goal_msg.order = order;

        RCLCPP_INFO(get_logger(), "Sending goal: order = %d", order);

        auto send_goal_options = rclcpp_action::Client<Fibonacci>::SendGoalOptions();
        send_goal_options.goal_response_callback =
            std::bind(&AdvancedActionClient::goal_response_callback, this, std::placeholders::_1);
        send_goal_options.feedback_callback =
            std::bind(&AdvancedActionClient::feedback_callback, this, std::placeholders::_1, std::placeholders::_2);
        send_goal_options.result_callback =
            std::bind(&AdvancedActionClient::result_callback, this, std::placeholders::_1);

        // Send goal and wait for result
        auto goal_handle_future = action_client_->async_send_goal(goal_msg, send_goal_options);

        if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), goal_handle_future) !=
            rclcpp::FutureReturnCode::SUCCESS) {
            RCLCPP_ERROR(get_logger(), "Failed to send goal");
            return false;
        }

        auto goal_handle = goal_handle_future.get();
        if (!goal_handle) {
            RCLCPP_ERROR(get_logger(), "Goal was rejected");
            return false;
        }

        // Wait for result
        auto result_future = goal_handle->async_get_result();

        if (rclcpp::spin_until_future_complete(this->get_node_base_interface(), result_future) !=
            rclcpp::FutureReturnCode::SUCCESS) {
            RCLCPP_ERROR(get_logger(), "Failed to get result");
            return false;
        }

        auto result = result_future.get();
        return result.code == rclcpp_action::ResultCode::SUCCEEDED;
    }

    void goal_response_callback(const rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr & goal_handle) {
        if (!goal_handle) {
            RCLCPP_ERROR(get_logger(), "Goal was rejected");
        } else {
            RCLCPP_INFO(get_logger(), "Goal accepted by server");
        }
    }

    void feedback_callback(
        rclcpp_action::ClientGoalHandle<Fibonacci>::SharedPtr,
        const std::shared_ptr<const Fibonacci::Feedback> feedback) {

        RCLCPP_INFO(get_logger(),
                   "Received feedback: partial_sequence length = %zu",
                   feedback->partial_sequence.size());
    }

    void result_callback(const rclcpp_action::ClientGoalHandle<Fibonacci>::WrappedResult & result) {
        switch (result.code) {
            case rclcpp_action::ResultCode::SUCCEEDED:
                RCLCPP_INFO(get_logger(), "Goal succeeded");
                RCLCPP_INFO(get_logger(), "Final sequence:");
                for (auto number : result.result->sequence) {
                    RCLCPP_INFO(get_logger(), "  %d", number);
                }
                break;
            case rclcpp_action::ResultCode::ABORTED:
                RCLCPP_ERROR(get_logger(), "Goal was aborted");
                break;
            case rclcpp_action::ResultCode::CANCELED:
                RCLCPP_ERROR(get_logger(), "Goal was canceled");
                break;
            default:
                RCLCPP_ERROR(get_logger(), "Unknown result code");
                break;
        }
    }

    rclcpp_action::Client<Fibonacci>::SharedPtr action_client_;
};

5.5 Real-world Communication Patterns

5.5.1 Sensor Processing Pipeline

Diagram: Sensor Processing Architecture

Raw Sensor Data
       ↓
┌─────────────────┐
│   Sensor Driver │
└─────────────────┘
       ↓
┌─────────────────┐
│  Preprocessing  │
│ - Noise Filter  │
│ - Calibration   │
│ - Validation    │
└─────────────────┘
       ↓
┌─────────────────┐
│  Feature        │
│  Extraction     │
│ - Detection     │
│ - Recognition   │
│ - Tracking      │
└─────────────────┘
       ↓
┌─────────────────┐
│  Sensor Fusion  │
│ - Kalman Filter │
│ - Weighting     │
│ - Validation    │
└─────────────────┘
       ↓
┌─────────────────┐
│  Publishing     │
└─────────────────┘

Example: Sensor Processing Pipeline

from rclpy.node import Node
from sensor_msgs.msg import Image, LaserScan
from geometry_msgs.msg import PointStamped
from nav_msgs.msg import Odometry
import numpy as np

class SensorProcessingPipeline(Node):
    def __init__(self):
        super().__init__('sensor_processing_pipeline')

        # Raw sensor subscribers
        self.image_sub = self.create_subscription(
            Image, '/camera/image_raw', self.image_callback, 10)
        self.lidar_sub = self.create_subscription(
            LaserScan, '/lidar/scan', self.lidar_callback, 10)
        self.odom_sub = self.create_subscription(
            Odometry, '/odom', self.odom_callback, 10)

        # Processed data publishers
        self.obstacle_pub = self.create_publisher(
            PointStamped, '/obstacles', 10)
        self.trajectory_pub = self.create_publisher(
            PointStamped, '/trajectory', 10)

        # Processing state
        self.latest_image = None
        self.latest_scan = None
        latest_odom = None

        # Processing parameters
        self.obstacle_threshold = 2.0  # meters
        self.publish_rate = 10.0  # Hz

        # Processing timer
        self.processing_timer = self.create_timer(
            1.0 / self.publish_rate, self.process_sensor_data)

        self.get_logger().info('Sensor processing pipeline initialized')

    def image_callback(self, msg):
        """Handle raw image data"""
        # Preprocess image
        processed_image = self.preprocess_image(msg)

        # Extract features
        features = self.extract_image_features(processed_image)

        # Store for fusion
        self.latest_image = {
            'header': msg.header,
            'features': features,
            'timestamp': self.get_clock().now()
        }

    def lidar_callback(self, msg):
        """Handle raw LiDAR data"""
        # Preprocess scan
        processed_scan = self.preprocess_lidar(msg)

        # Extract obstacles
        obstacles = self.extract_obstacles(processed_scan)

        # Store for fusion
        self.latest_scan = {
            'header': msg.header,
            'obstacles': obstacles,
            'timestamp': self.get_clock().now()
        }

    def odom_callback(self, msg):
        """Handle odometry data"""
        self.latest_odom = {
            'header': msg.header,
            'pose': msg.pose.pose,
            'twist': msg.twist.twist,
            'timestamp': self.get_clock().now()
        }

    def preprocess_image(self, msg):
        """Preprocess raw image"""
        # In real implementation, use OpenCV or similar
        # Convert to grayscale, apply filters, etc.
        return msg

    def extract_image_features(self, image_msg):
        """Extract features from preprocessed image"""
        # Simulate feature extraction
        features = []

        # Create test features
        for i in range(5):
            feature = {
                'id': i,
                'position': (np.random.rand() * image_msg.width,
                            np.random.rand() * image_msg.height),
                'confidence': np.random.rand(),
                'type': 'corner'
            }
            features.append(feature)

        return features

    def preprocess_lidar(self, msg):
        """Preprocess raw LiDAR scan"""
        # Remove invalid readings
        valid_ranges = []
        for i, r in enumerate(msg.ranges):
            if msg.range_min <= r <= msg.range_max:
                valid_ranges.append(r)
            else:
                valid_ranges.append(float('inf'))

        # Smooth the data
        smoothed_ranges = self.smooth_ranges(valid_ranges)

        return {
            'ranges': smoothed_ranges,
            'angle_min': msg.angle_min,
            'angle_max': msg.angle_max,
            'angle_increment': msg.angle_increment
        }

    def smooth_ranges(self, ranges):
        """Smooth LiDAR range data"""
        if len(ranges) < 3:
            return ranges

        smoothed = []
        for i in range(len(ranges)):
            if i == 0 or i == len(ranges) - 1:
                smoothed.append(ranges[i])
            else:
                # Simple moving average
                avg = (ranges[i-1] + ranges[i] + ranges[i+1]) / 3.0
                smoothed.append(avg)

        return smoothed

    def extract_obstacles(self, scan_data):
        """Extract obstacle positions from LiDAR scan"""
        obstacles = []

        for i, r in enumerate(scan_data['ranges']):
            if r < self.obstacle_threshold:
                # Convert polar to Cartesian
                angle = scan_data['angle_min'] + i * scan_data['angle_increment']
                x = r * np.cos(angle)
                y = r * np.sin(angle)

                obstacles.append({
                    'position': (x, y),
                    'distance': r,
                    'angle': angle,
                    'confidence': 1.0 / (1.0 + r)  # Closer obstacles more confident
                })

        return obstacles

    def process_sensor_data(self):
        """Main processing function"""
        if not self.latest_scan or not self.latest_odom:
            return

        # Publish detected obstacles
        self.publish_obstacles()

        # Publish trajectory (example)
        self.publish_trajectory()

    def publish_obstacles(self):
        """Publish detected obstacles"""
        for obstacle in self.latest_scan['obstacles']:
            msg = PointStamped()
            msg.header = self.latest_scan['header']
            msg.header.frame_id = 'base_link'
            msg.point.x = obstacle['position'][0]
            msg.point.y = obstacle['position'][1]
            msg.point.z = 0.0

            self.obstacle_pub.publish(msg)

    def publish_trajectory(self):
        """Publish trajectory point"""
        if self.latest_odom:
            msg = PointStamped()
            msg.header = self.latest_odom['header']
            msg.header.frame_id = 'base_link'
            msg.point.x = self.latest_odom['pose'].position.x
            msg.point.y = self.latest_odom['pose'].position.y
            msg.point.z = self.latest_odom['pose'].position.z

            self.trajectory_pub.publish(msg)

# Advanced sensor fusion with Kalman filtering
class AdvancedSensorFusion(SensorProcessingPipeline):
    def __init__(self):
        super().__init__()

        # Kalman filter for sensor fusion
        self.kf_state = np.zeros([6, 1])  # [x, y, z, vx, vy, vz]
        self.kf_covariance = np.eye(6) * 0.1
        self.last_fusion_time = self.get_clock().now()

        # Fused data publisher
        self.fused_odom_pub = self.create_publisher(
            Odometry, '/fused_odom', 10)

    def process_sensor_data(self):
        """Advanced processing with sensor fusion"""
        super().process_sensor_data()

        # Perform sensor fusion
        self.sensor_fusion()

    def sensor_fusion(self):
        """Perform Kalman filter-based sensor fusion"""
        current_time = self.get_clock().now()
        dt = (current_time - self.last_fusion_time).nanoseconds / 1e9

        if dt > 0:
            # Prediction step
            self.predict_step(dt)

            # Update step with measurements
            if self.latest_odom:
                self.update_step_with_odom()

            # Publish fused odometry
            self.publish_fused_odometry()

            self.last_fusion_time = current_time

    def predict_step(self, dt):
        """Kalman filter prediction step"""
        # State transition matrix
        F = np.array([
            [1, 0, 0, dt, 0, 0],  # x
            [0, 1, 0, 0, dt, 0],  # y
            [0, 0, 1, 0, 0, dt],  # z
            [0, 0, 0, 1, 0, 0],  # vx
            [0, 0, 0, 0, 1, 0],  # vy
            [0, 0, 0, 0, 0, 1]   # vz
        ])

        # Process noise
        Q = np.eye(6) * 0.01

        # Predict
        self.kf_state = F @ self.kf_state
        self.kf_covariance = F @ self.kf_covariance @ F.T + Q

    def update_step_with_odom(self):
        """Update step with odometry measurement"""
        # Measurement matrix (we only measure position)
        H = np.array([
            [1, 0, 0, 0, 0, 0],
            [0, 1, 0, 0, 0, 0],
            [0, 0, 1, 0, 0, 0]
        ])

        # Measurement noise
        R = np.eye(3) * 0.1

        # Measurement
        z = np.array([
            [self.latest_odom['pose'].position.x],
            [self.latest_odom['pose'].position.y],
            [self.latest_odom['pose'].position.z]
        ])

        # Kalman gain
        S = H @ self.kf_covariance @ H.T + R
        K = self.kf_covariance @ H.T @ np.linalg.inv(S)

        # Update
        y = z - H @ self.kf_state
        self.kf_state = self.kf_state + K @ y
        self.kf_covariance = (np.eye(6) - K @ H) @ self.kf_covariance

    def publish_fused_odometry(self):
        """Publish fused odometry"""
        msg = Odometry()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'odom'
        msg.child_frame_id = 'base_link'

        # Set position
        msg.pose.pose.position.x = self.kf_state[0, 0]
        msg.pose.pose.position.y = self.kf_state[1, 0]
        msg.pose.pose.position.z = self.kf_state[2, 0]

        # Set velocity
        msg.twist.twist.linear.x = self.kf_state[3, 0]
        msg.twist.twist.linear.y = self.kf_state[4, 0]
        msg.twist.twist.linear.z = self.kf_state[5, 0]

        self.fused_odom_pub.publish(msg)

Summary

This chapter explored advanced ROS 2 communication patterns and architectural approaches essential for building complex, robust robotic systems. We covered sophisticated node designs, optimized topic patterns, robust service implementations, and complex action workflows.

Key takeaways:

  • Node composition and multi-threading improve performance and reduce overhead
  • Lifecycle management provides controlled startup and shutdown procedures
  • Bandwidth optimization and proper message design are crucial for high-frequency data
  • Robust error handling and retry mechanisms ensure system reliability
  • Multi-stage actions with detailed feedback enable complex workflows
  • Sensor processing pipelines demonstrate real-world integration patterns
  • Advanced patterns like sensor fusion with Kalman filtering enhance system capabilities

Exercises

Exercise 5.1: Multi-threaded Node Implementation

Create a multi-threaded node that:

  • Processes sensor data in one thread
  • Performs planning in another thread
  • Executes control in a third thread
  • Demonstrates proper thread synchronization
  • Measures and reports performance

Exercise 5.2: Optimized Topic Design

Design an optimized communication system:

  • Create custom message types for specific use cases
  • Implement bandwidth optimization techniques
  • Test with different QoS settings
  • Measure performance improvements
  • Document trade-offs

Exercise 5.3: Robust Service Chain

Implement a service chain with:

  • Multiple services with dependencies
  • Error handling and recovery
  • Retry mechanisms
  • Performance monitoring
  • Load balancing strategies

Exercise 5.4: Advanced Action Workflow

Create a multi-stage action system:

  • Implement custom action interface
  • Design multi-stage workflow
  • Add detailed feedback mechanism
  • Implement cancellation and preemption
  • Add timeout and retry logic

Exercise 5.5: Sensor Fusion System

Build a complete sensor fusion system:

  • Integrate multiple sensor types
  • Implement Kalman filter for fusion
  • Handle sensor failures gracefully
  • Provide performance metrics
  • Validate with ground truth data

Glossary Terms

  • Node Composition: Combining multiple logical nodes into a single process
  • Multi-threaded Node: Node using multiple threads for concurrent processing
  • Lifecycle Management: Controlled startup, activation, deactivation, and shutdown of nodes
  • Bandwidth Optimization: Techniques to reduce data transmission requirements
  • Service Chain: Sequence of services where output of one becomes input of next
  • Action Workflow: Multi-stage process with feedback and intermediate results
  • Sensor Fusion: Combining data from multiple sensors for improved perception
  • Kalman Filter: Recursive algorithm for estimating system state from noisy measurements
  • QoS (Quality of Service): Policies controlling data exchange behavior
  • Message Serialization: Process of converting message objects to bytes for transmission