2026信息素养
2026智能博物赛项
RobotMasterB类机器人教程
智能博物扩展版使用说明
智能博物图形化AilyBlockly编程
Arduino代码电机编程
B类机器人比赛核心流程控制
汝城县职业中等专业学校知识库-信息中心朱老师编辑
-
+
首页
B类机器人比赛核心流程控制
#### car_ws (工作空间) 这个文件夹里面存放了所有比赛的文件 逐个文件夹详解 1. arm_control 你们一直在调试的机械臂专属控制包 包含:机械臂电机动作、AprilTag视觉识别、像素-电机标定、开机初始化、限位安全测试、高低堆码抓取全套逻辑 2. car_bringup 整车机器人总开机启动包 负责机器人全局命名空间、整机一键launch上电、全车参数统一配置、机器人开机自检总流程 3. car_interfaces 小车+机械臂自定义通讯接口包 代码里用的 `MoveMotor` 电机动作、自定义ROS2话题/服务/动作格式,全项目通用通讯协议都在这里编译定义 4. competition_flow 竞赛全自动任务流程包 比赛完整时序逻辑:小车循迹 → 视觉识别标签 → 机械臂抓取物料 → 分层堆码摆放,整套自动竞赛流程 5. emm_motor_driver 底盘+机械臂电机底层硬件驱动包 和电机硬件串口通讯、下发运动指令、实时回读电机位置/状态,底层电机总线驱动 6. my_camera_launch 相机一键启动配置包 摄像头驱动启动、图像话题配置、相机参数校准,一键打开视觉相机 7. odom_calib 小车里程计标定工具包 小车车轮行驶里程、IMU定位标定,校准小车行走距离、自身定位坐标精度 8. planner 小车路径规划算法包 小车自主导航、避障轨迹计算、行驶路线规划、赛场路径决策 9. web_viz 网页可视化监控包 手机/电脑网页实时查看:相机画面、AprilTag检测结果、机械臂电机位置、机器人全部运行状态 主工作空间,存放了所有的核心功能包。所有的比赛逻辑代码都在这里修改。 #### 如何修改参数? 需要调整点位参数时,编辑以下配置文件: `car_ws/competition_flow/config` ### arm_config.yaml文件解释 ``` /**/arm_action_node: ros__parameters: # ====================== 舵机角度配置(夹爪/盖子开关)====================== # 盖子舵机 (ID=1) cover_open_angle: 110.0 # 盖子打开角度 cover_close_angle: 0.0 # 盖子关闭角度 # 左夹爪舵机 (ID=2) left_fork_open_angle: 31.0 # 左夹爪正常打开角度 left_fork_close_angle: 51.0 # 左夹爪正常关闭角度 # 右夹爪舵机 (ID=3) right_fork_open_angle: 120.0 # 右夹爪正常打开角度 right_fork_close_angle: 100.0 # 右夹爪正常关闭角度 # 高堆放模式专用夹爪角度(堆码>=3层时使用) high_stack_left_fork_open: 175.0 # 高堆放模式左叉打开角度 high_stack_left_fork_close: 155.0 # 高堆放模式左叉关闭角度 high_stack_right_fork_open: 5.0 # 高堆放模式右叉打开角度 high_stack_right_fork_close: 25.0 # 高堆放模式右叉关闭角度 # 视觉扫描中心偏移量 # 正数=向右偏移,负数=向左偏移 scan_center_offset: 0 # ======== 电机ID配置(必须与硬件对应)========= motor_id_x: 1 # X轴(左右水平)电机ID motor_id_y: 2 # Y轴(前后伸缩)电机ID motor_id_z_left: 3 # Z轴左电机ID motor_id_z_right: 4 # Z轴右电机ID # ====================== 视觉图像处理参数 ====================== image_topic: 'image_car' # 订阅的视觉图像话题名 brightness_factor: 1.0 # 图像亮度调节系数(1.0为默认不调节) # ====================== 视觉扫描运动参数 ====================== scan_start_pos: 3400 # 扫描起始X坐标位置 scan_end_pos: -3400 # 扫描结束X坐标位置 scan_speed: 40 # 扫描运动速度 # 像素转电机步数比例(通过 calibration_node 校准得到) scan_pixel_to_motor_scale: 13.6533 # 多帧识别稳定参数(减少误识别/漏识别) scan_multi_frame_count: 5 # 累计识别帧数(越大越稳定) scan_multi_frame_delay: 0.06 # 帧间延迟(秒) # ====================== 电机通用控制参数 ====================== position_tolerance: 50 # 位置到达允许误差(步数) motor_timeout_sec: 8.0 # 电机运动超时时间(秒) # ====================== 放置高度层级(Z轴伸出长度)====================== # 等级1最低(伸出最长),等级4最高(伸出最短) place_height_level_1: -100 # 放置高度1层 place_height_level_2: -3600 # 放置高度2层 place_height_level_3: -7400 # 放置高度3层 place_height_level_4: -3700 # 放置高度4层 # ====================== 抓取动作电机位置 ====================== pick_z_lower_left: -8100 # 抓取时Z左轴下降位置 pick_z_lower_right: 8100 # 抓取时Z右轴下降位置 pick_y_extend: 5000 # 抓取时Y轴伸出位置 pick_z_final_left: -8900 # 抓取最终Z左位置 pick_z_final_right: 8900 # 抓取最终Z右位置 pick_z_lift_left: -8500 # 抓取后抬起Z左位置 pick_z_lift_right: 8500 # 抓取后抬起Z右位置 pick_x_reset: 0 # 抓取完成后X轴回中位置 # ====================== 高堆放模式抓取参数 ====================== high_stack_pick_z_lower_left: -970 # 高堆放模式 Z 下降位置 (左) high_stack_pick_z_lower_right: 970 # 高堆放模式 Z 下降位置 (右) high_stack_pick_z_final_left: -2000 # 高堆放模式 Z 抓取位置 (左) high_stack_pick_z_final_right: 2000 # 高堆放模式 Z 抓取位置 (右) high_stack_pick_z_lift_left: -1400 # 高堆放模式 Z 抬起位置 (左) high_stack_pick_z_lift_right: 1400 # 高堆放模式 Z 抬起位置 (右) high_stack_z_prepare_left: -6200 # 高堆放模式准备位置 Z (左) - 先升起到此位置再打开夹爪 high_stack_z_prepare_right: 6200 # 高堆放模式准备位置 Z (右) - 先升起到此位置再打开夹爪 high_stack_place_z_raise_left: -6000 # 高堆放 Place 时先升起 Z 轴到此位置 (左) high_stack_place_z_raise_right: 6000 # 高堆放 Place 时先升起 Z 轴到此位置 (右) high_stack_place_z_home_left: -1300 # 高堆放 Place 时 Z 轴回位位置 (左) high_stack_place_z_home_right: 1300 # 高堆放 Place 时 Z 轴回位位置 (右) # ====================== 放置动作电机位置 ====================== place_y_extend: 6200 # 放置时Y轴伸出位置 # ====================== 机械臂原点(HOME)位置 ====================== home_x: 0 # X轴原点 home_y: -900 # Y轴原点 home_z_left: -4300 # Z左轴原点 home_z_right: 4300 # Z右轴原点 # ====================== 扫描动作专用位置/速度 ====================== scan_z_left: -3700 # 扫描时Z左轴位置 scan_z_right: 3700 # 扫描时Z右轴位置 scan_move_speed: 2000 # 扫描移动速度 scan_move_acc: 200 # 扫描移动加速度 scan_backup_speed: -0.05 # 扫描后退速度 scan_backup_duration: 1.5 # 扫描后退持续时间 scan_fail_backup_speed: 0.3 # 识别失败时后退速度 # AprilTag 配对距离验证(防止误匹配) scan_pair_distance_min: 1900 # 最小配对距离 scan_pair_distance_max: 2800 # 最大配对距离 # 标签聚类阈值(区分物理上独立的标签) scan_cluster_threshold: 500 # ====================== 通用动作速度 ====================== pick_speed: 1500 # 抓取动作速度 place_speed: 2000 # 放置动作速度 home_speed: 1500 # 回原点速度 # ====================== 放置动作后退参数 ====================== place_backup_speed: 0.2 # 放置后退速度 place_backup_duration: 1.0 # 放置后退时间 # ====================== 开机初始化序列参数 ====================== start_speed: 1500 # 初始化速度 start_acc: 200 # 初始化加速度 start_y_home: -900 # 初始化Y轴原点 start_z_left_home: -4000 # 初始化Z左原点 start_z_right_home: 4000 # 初始化Z右原点 start_z_left_intermediate: -5000 # 初始化Z左中间位置 start_z_right_intermediate: 6000 # 初始化Z右中间位置 # ====================== 关节软限位(防止超程撞机)====================== # 所有指令会被限制在这个范围内 # X轴(左右水平)软限位 joint_x_min: -3500 joint_x_max: 3500 # Y轴(前后伸缩)软限位 joint_y_min: -900 joint_y_max: 7000 # Z左轴(垂直升降)软限位(负值向下) joint_z_left_min: -8900 joint_z_left_max: 0 # Z右轴(垂直升降)软限位(正值向上) joint_z_right_min: 0 joint_z_right_max: 8900 # ====================== 关节限位测试节点 ====================== # 功能:测试各轴是否能安全到达最大/最小限位,不执行抓取/放置 /**/test_joint_limits_node: ros__parameters: # 电机ID(必须与 arm_action_node 一致) motor_id_x: 1 motor_id_y: 2 motor_id_z_left: 3 motor_id_z_right: 4 # 测试运动速度与加速度 speed: 1000 acc: 80 # 各轴测试目标位置 test_x_min: -3500 # X轴最小测试位置 test_x_max: 3500 # X轴最大测试位置 test_x_home: 0 # X轴测试原点 test_y_max: 6400 # Y轴最大测试位置 test_y_home: -900 # Y轴测试原点 test_z_left_max: -8900 # Z左轴最大行程 test_z_right_max: 8900 # Z右轴最大行程 test_z_left_home: -3700 # Z左轴测试原点 test_z_right_home: 3700 # Z右轴测试原点 test_z_left_intermediate: -5000 # Z左轴中间测试点 test_z_right_intermediate: 6000 # Z右轴中间测试点 # ====================== 机械臂开机启动节点 ====================== # 功能:开机自动执行回原点、夹爪初始化、安全自检 /**/start_arm_node: ros__parameters: # 电机ID(必须一致) motor_id_x: 1 motor_id_y: 2 motor_id_z_left: 3 motor_id_z_right: 4 # 启动运动参数 speed: 1500 acc: 200 # 启动序列目标位置 test_y_max: 7300 # Y轴最大伸出位置 test_y_home: -900 # Y轴原点 test_z_left_max: -8900 # Z左轴最大行程 test_z_right_max: 8900 # Z右轴最大行程 test_z_left_home: -4000 # Z左轴开机原点 test_z_right_home: 4000 # Z右轴开机原点 test_z_left_intermediate: -5000 # Z左轴中间过渡位 test_z_right_intermediate: 6000 # Z右轴中间过渡位 # 开机时舵机角度(夹爪/盖子初始状态) cover_open_angle: 90.0 left_fork_open_angle: 10.0 right_fork_open_angle: 170.0 # ====================== 标签配对距离监控节点 ====================== # 功能:实时监控视觉标签距离,异常时报警/停止 /**/pair_distance_monitor: ros__parameters: image_topic: '/image_car' # 图像话题 motor_id_x: 1 # X轴电机ID scan_pixel_to_motor_scale: 13.6533 # 像素转步数比例 publish_rate: 2.0 # 数据发布频率(Hz) ``` #### 主逻辑 比赛的核心流程控制(状态机、任务调度)位于以下 Python 节点: `car_ws/competition_flow/competition_flow/competition_node.py` ``` #!/usr/bin/env python3 """ Competition Flow Node - Final Competition State Machine 控制小车完成5组货物的 nav → scan → pick → nav_place 流程 基于 ROS2 Action 的统一流程控制 """ import rclpy # 导入ROS2核心库 from rclpy.node import Node # 导入ROS2节点基类,用来创建节点 from rclpy.action import ActionClient # 导入动作客户端,用来发送动作请求 from rclpy.callback_groups import ReentrantCallbackGroup # 导入回调组,允许多个回调同时运行 from geometry_msgs.msg import PoseStamped, Twist # 导入位姿消息、速度消息 from nav_msgs.msg import Odometry # 导入里程计消息 from std_msgs.msg import String, Int32MultiArray, Float32MultiArray # 导入通用消息类型 from car_interfaces.action import Scan, Pick, Place, Home, StartArm # 导入自定义动作接口 from car_interfaces.srv import GetWaypoints, UpdatePlaceCount, GetPlaceCount # 导入自定义服务接口 from planner.action import NavigateTo # 导入导航动作接口 import math # 导入数学库,用来算角度 import yaml # 导入配置文件解析库 import json # Added json import # 导入json处理库,解析数据 import os # 导入系统库 import time # 导入时间库,做延时 import threading # 导入线程库,后台运行任务 from enum import Enum, auto # 导入枚举,用来定义状态 from ament_index_python.packages import get_package_share_directory # 导入功能包路径查找工具 class State(Enum): """竞赛状态枚举""" IDLE = auto() # 空闲状态,啥也不干 NAV_TO_VIA = auto() # 导航去中途点,绕个路 NAV_TO_PICK = auto() # 导航去取货点 SCAN = auto() # 扫描二维码/标签 PICK = auto() # 抓取货物 NAV_TO_PLACE = auto() # 导航去左边放置点 PLACE = auto() # 左边放货物 NAV_TO_PLACE_RIGHT = auto() # 导航去右边放置点 PLACE_RIGHT = auto() # 右边放货物 WAITING_FOR_PARTNER = auto() # 等另一台车干完活 HOME = auto() # 机械臂回到初始位置 NAV_TO_START = auto() # 小车回到起点 DONE = auto() # 全部任务完成 class CompetitionNode(Node): """最终比赛流程控制节点 - 根据配置文件处理多组货物""" def __init__(self): super().__init__('competition_node') # 创建ROS2节点,名字叫competition_node # ==================== Namespace Detection ==================== # Must run first to determine role (Master/Server or Slave/Client) self._detect_namespace() # 先检测自己是哪台小车(A车/B车) # ==================== Load Configuration ==================== # Now init depends on role self._init_config_loading() # 加载比赛用的点位配置 # ==================== State Machine ==================== self.state = State.IDLE # 初始状态:空闲 self.cargo_index = 0 # 当前正在搬第几件货物 self.target_tag_id = None # 扫描到的货物标签ID self.left_tag_id = None # 扫描到的左边ID self.right_tag_id = None # 扫描到的右边ID # 存储状态: {side: cargo_id} (1: Left, 2: Right) self.arm_storage = {1: None, 2: None} # 记录左右机械臂分别抓了啥货 # 途径点导航状态 self.via_queue = [] # 要走的中途点列表 self.final_state = None # 走完中途点要进的状态 self.final_goal = None # 最终要去的坐标 self._retry_scan_after_via = False # 扫描失败后,是否要去中途点重试 # 导航重试状态 self._current_nav_goal = None # 当前正在导航去的位置 self._nav_retry_count = 0 # 导航失败重试次数 self._nav_retry_timer = None # 导航重试计时器 # ==================== Multi-Robot Coordination ==================== self._partner_state = None # 另一台车的状态 self._partner_place_tag_id = None # 另一台车正在放的货ID self._pending_place_tag_id = None # 自己马上要放的货ID self._pending_place_side = None # 自己用哪边放货(左/右) self._pending_place_point = None # 自己要去的放置点坐标 self._pending_via_names = None # 要走的中途点名字 self._wait_check_timer = None # 等待伙伴的检查计时器 self._proceed_timer = None # 延迟继续任务的计时器 self._needs_coordination_check = False # 走完中途点要不要等伙伴 # ==================== Partner Detection ==================== self._partner_detected = False # 是否检测到另一台车 self._single_robot_mode = False # 是否只有一台车干活 # ==================== High Stack Post-Place ==================== self._left_place_level = None # 左边放货层数 self._right_place_level = None # 右边放货层数 # ==================== Competition Start Control ==================== self._competition_started = False # 比赛是否开始 self._partner_start_timer = None # 给伙伴发启动信号的计时器 self._pending_start_after_config = False # 加载完配置是否马上开始 self._pending_start_after_arm_init = False # 机械臂初始化完是否开始 self._arm_initialized = False # 机械臂是否初始化完成 # ==================== Parallel Start Control ==================== self._parallel_start_mode = False # 是否同时启动机械臂+导航 self._arm_init_done_for_parallel = False # 并行模式下机械臂是否就绪 self._first_nav_done_for_parallel = False # 并行模式下第一次导航是否完成 # 高堆放参数 (从 arm_config.yaml 获取默认值) self.declare_parameter('motor_id_z_left', 3) # 声明参数:左Z轴电机ID self.declare_parameter('motor_id_z_right', 4) # 声明参数:右Z轴电机ID self.declare_parameter('high_stack_z_prepare_left', -6200) # 左高堆准备高度 self.declare_parameter('high_stack_z_prepare_right', 6200) # 右高堆准备高度 self.declare_parameter('high_stack_home_z_left', -4300) # 左高堆初始高度 self.declare_parameter('high_stack_home_z_right', 4300) # 右高堆初始高度 self.declare_parameter('high_stack_left_fork_open', 30.0) # 左夹爪打开角度 self.declare_parameter('high_stack_right_fork_open', 145.0) # 右夹爪打开角度 self.declare_parameter('pick_speed', 500) # 抓取速度 self._id_z_left = self.get_parameter('motor_id_z_left').value # 读取左Z轴电机ID self._id_z_right = self.get_parameter('motor_id_z_right').value # 读取右Z轴电机ID self._high_stack_z_prepare_left = self.get_parameter('high_stack_z_prepare_left').value # 读取左准备高度 self._high_stack_z_prepare_right = self.get_parameter('high_stack_z_prepare_right').value # 读取右准备高度 self.z_home_left = self.get_parameter('high_stack_home_z_left').value # 读取左初始高度 self.z_home_right = self.get_parameter('high_stack_home_z_right').value # 读取右初始高度 self._high_stack_left_fork_open = self.get_parameter('high_stack_left_fork_open').value # 读取左夹爪角度 self._high_stack_right_fork_open = self.get_parameter('high_stack_right_fork_open').value # 读取右夹爪角度 self._pick_speed = self.get_parameter('pick_speed').value # 读取抓取速度 # Cover 参数 self.declare_parameter('cover_open_angle', 110.0) # 声明盖子打开角度 self._cover_open_angle = self.get_parameter('cover_open_angle').value # 读取盖子角度 # ==================== Callback Group ==================== self._cb_group = ReentrantCallbackGroup() # 创建可重入回调组,允许多任务同时跑 # ==================== Action Clients ==================== # Navigation self._nav_client = ActionClient( self, NavigateTo, 'navigate_to', callback_group=self._cb_group ) # 创建导航动作客户端 # Arm Actions self._scan_client = ActionClient( self, Scan, 'scan', callback_group=self._cb_group ) # 创建扫描动作客户端 self._pick_client = ActionClient( self, Pick, 'pick', callback_group=self._cb_group ) # 创建抓取动作客户端 self._place_client = ActionClient( self, Place, 'place', callback_group=self._cb_group ) # 创建放置动作客户端 self._home_client = ActionClient( self, Home, 'home', callback_group=self._cb_group ) # 创建归位动作客户端 self._start_arm_client = ActionClient( self, StartArm, 'start_arm', callback_group=self._cb_group ) # 创建机械臂启动客户端 # ==================== Publishers ==================== self.state_pub = self.create_publisher(String, 'competition_state', 10) # 创建状态发布者 self.light_pub = self.create_publisher(Int32MultiArray, 'light_board_cmd', 10) # 创建灯板命令发布者 self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10) # 创建小车速度发布者 self.motor_pub = self.create_publisher(String, 'motor_cmd', 10) # 创建电机命令发布者 self.servo_pub = self.create_publisher(Float32MultiArray, 'servo_cmd', 10) # 创建舵机命令发布者 # 发布到伙伴的启动话题 if self._partner_ns is not None: if self._partner_ns == '': partner_start_topic = '/competition_start' # 无命名空间的话题 else: partner_start_topic = f'/{self._partner_ns}/competition_start' self._partner_start_pub = self.create_publisher(String, partner_start_topic, 10) # 给伙伴发启动信号 self.get_logger().info(f"Will publish to partner start topic: {partner_start_topic}") # 打印日志 else: self._partner_start_pub = None # 没有伙伴就不创建 # ==================== Subscribers ==================== # 订阅竞赛启动话题 (使用相对话题名,ROS2会自动加上命名空间) self._competition_start_sub = self.create_subscription( String, 'competition_start', self._on_competition_start, 10, callback_group=self._cb_group ) # 订阅比赛开始信号 self.get_logger().info("Subscribing to competition_start topic (will auto-start on message)") # 打印日志 # 订阅另一台车的状态 (用于协调放置) if self._partner_ns is not None: # 注意:空字符串也是有效的伙伴 if self._partner_ns == '': partner_topic = '/competition_state' # 无命名空间的话题 else: partner_topic = f'/{self._partner_ns}/competition_state' self._partner_state_sub = self.create_subscription( String, partner_topic, self._on_partner_state, 10, callback_group=self._cb_group ) # 订阅伙伴状态 self.get_logger().info(f"Subscribing to partner state: {partner_topic}") # 打印日志 # 订阅伙伴的 odom 话题 (用于检测伙伴是否存在) if self._partner_ns == '': partner_odom_topic = '/odom' # 无命名空间的话题 else: partner_odom_topic = f'/{self._partner_ns}/odom' self._partner_odom_sub = self.create_subscription( Odometry, partner_odom_topic, self._on_partner_odom, 10, callback_group=self._cb_group ) # 订阅伙伴里程计,判断车在不在 self.get_logger().info(f"Subscribing to partner odom for detection: {partner_odom_topic}") # 打印日志 else: self._partner_state_sub = None # 没伙伴就不订阅 self._partner_odom_sub = None self.get_logger().warn("No partner namespace detected, multi-robot coordination disabled") # 打印警告 # 状态发布定时器 self.create_timer(0.5, self._publish_state) # 每0.5秒发一次当前状态 self.get_logger().info("=" * 50) # 打印分割线 self.get_logger().info("Competition Node Started (Waiting for Config Service...)") # 节点启动完成 self.get_logger().info("=" * 50) # 打印分割线 # Arm initialization will be triggered after config is loaded # self._start_arm_initialization() # ==================== Namespace Detection ==================== def _detect_namespace(self): """检测当前节点的命名空间并确定伙伴机器人命名空间""" # 获取当前节点的完全限定名 full_name = self.get_fully_qualified_name() # 拿到自己节点全名 self.get_logger().info(f"Node fully qualified name: {full_name}") # 打印日志 # 解析命名空间 # full_name 格式: /namespace/node_name 或 /node_name parts = full_name.split('/') # 按/拆分名字 if len(parts) >= 3: # 有命名空间: ['', 'namespace', 'node_name'] self._my_ns = parts[1] # 记录自己的命名空间 else: # 没有命名空间 self._my_ns = None # 无命名空间 # TF 话题已隔离,frame 名称可以直接使用简单名称 self._map_frame = 'map' # 地图坐标系名字 # 确定伙伴命名空间 # robotA 的伙伴是无命名空间的机器人,无命名空间的机器人的伙伴是 robotA if self._my_ns == 'robotA': self._partner_ns = '' # 空命名空间 elif self._my_ns is None: self._partner_ns = 'robotA' # 伙伴是A车 else: self._partner_ns = None # 没伙伴 self.get_logger().info(f"My namespace: {self._my_ns}, Partner namespace: {self._partner_ns}") # 打印自己和伙伴名字 def _on_partner_state(self, msg: String): """处理另一台车的状态消息""" self._partner_state = msg.data # 保存伙伴状态 # 标记检测到伙伴 if not self._partner_detected: self._partner_detected = True # 确认伙伴在线 self.get_logger().info("Partner detected via state message!") # 打印日志 # 如果正在等待,检查是否可以继续 if self.state == State.WAITING_FOR_PARTNER: self._check_can_proceed_to_place() # 检查能不能继续放货 def _on_partner_odom(self, msg: Odometry): """处理伙伴的 odom 消息,用于检测伙伴是否存在""" # 只在第一次收到消息时标记并销毁订阅 if not self._partner_detected: self._partner_detected = True # 收到里程计,说明伙伴在线 self.get_logger().info("Partner detected via odom message!") # 打印日志 # 检测到伙伴后,销毁 odom 订阅(不再需要) if self._partner_odom_sub is not None: self.destroy_subscription(self._partner_odom_sub) # 关掉订阅 self._partner_odom_sub = None self.get_logger().info("Partner odom subscription removed after detection.") # 打印日志 def _on_competition_start(self, msg: String): """处理竞赛启动话题消息""" if self._competition_started: self.get_logger().warn("Competition already started, ignoring start message") # 已经开始,忽略信号 return self.get_logger().info("=" * 50) # 分割线 self.get_logger().info("=== COMPETITION START SIGNAL RECEIVED ===") # 收到开始信号 self.get_logger().info("=" * 50) # 分割线 # 比赛开始时发布常亮灯光 self._publish_steady_light() # 开灯 # 标记已启动 self._competition_started = True # 比赛开始标志设为真 # 如果配置已加载,直接启动流程 # 否则等待配置加载完成后再启动(在_on_config_ready中检查) if hasattr(self, 'config_data') and self.config_data: self.get_logger().info("Config already loaded, starting competition flow...") # 配置就绪,开始跑流程 self._trigger_competition_start() # 启动比赛 else: self.get_logger().info("Config not ready yet, will start after config is loaded") # 配置没好,等一等 # 设置标志,在配置加载完成后自动启动 self._pending_start_after_config = True # 配置好后自动开始 def _trigger_competition_start(self): """触发竞赛启动(先执行初始移动,然后初始化机械臂)""" # 在收到启动信号时,检查是否检测到伙伴 if self._partner_ns is not None: if self._partner_detected: # 检测到伙伴,双车模式 self._single_robot_mode = False # 双车干活 self.get_logger().info("=" * 50) self.get_logger().info("Partner detected! DUAL ROBOT MODE enabled.") self.get_logger().info("=" * 50) else: # 没有检测到伙伴,单车模式 self._single_robot_mode = True # 只有自己一台车 self.get_logger().info("=" * 50) self.get_logger().info("WARNING: No partner detected!") self.get_logger().info("Enabling SINGLE ROBOT MODE - will run all cargo") self.get_logger().info("=" * 50) self.get_logger().info("Starting initial movement...") # 开始初始移动 # 先执行初始移动,初始移动完成后再启动机械臂初始化 self._perform_initial_movement() # 小车先挪一下 # 10秒后发送启动信号给伙伴(使用线程定时器实现一次性延迟) if self._partner_start_pub is not None: self.get_logger().info("Scheduling partner start signal in 10 seconds...") # 10秒后叫伙伴开始 timer = threading.Timer(10.0, self._send_partner_start_signal) timer.daemon = True timer.start() self._partner_start_timer = timer def _send_partner_start_signal(self): """发送启动信号给伙伴(10秒后调用)""" if self._partner_start_timer is not None: self._partner_start_timer = None if self._partner_start_pub is not None: msg = String() msg.data = "start" self._partner_start_pub.publish(msg) # 给伙伴发开始指令 self.get_logger().info("=" * 50) self.get_logger().info("=== SENT COMPETITION START SIGNAL TO PARTNER ===") self.get_logger().info("=" * 50) else: self.get_logger().warn("No partner start publisher available") # 没伙伴,发不了 # ==================== Configuration ==================== def _init_config_loading(self): """Initialize configuration loading""" # All nodes act as clients and request config from the central service self.get_logger().info("Requesting remote configuration...") # 请求配置文件 self._request_remote_config() # 向服务端要点位 def _request_remote_config(self): """Request config from central Service""" # Service name is global '/get_waypoints' provided by the C++ node # Ensure we look for the global service, not namespaced # Assuming the C++ node is launched globally or we know where it is. # If user runs: ros2 run car_bringup waypoints_server_node -> /get_waypoints self.get_logger().info(f"Waiting for service: {service_name}") # 等待配置服务 self._config_client = self.create_client(GetWaypoints, service_name) # 创建配置服务客户端 # Also create UpdatePlaceCount client (same node usually) place_service_name = '/update_place_count' self.get_logger().info(f"Waiting for service: {place_service_name}") self._place_count_client = self.create_client(UpdatePlaceCount, place_service_name) # 创建放货计数服务 # Create GetPlaceCount client for querying stack counts (without incrementing) get_count_service_name = '/get_place_count' self.get_logger().info(f"Waiting for service: {get_count_service_name}") self._get_place_count_client = self.create_client(GetPlaceCount, get_count_service_name) # 创建查询层数服务 self._service_wait_timer = self.create_timer(1.0, self._check_config_service) # 每秒检查服务是否就绪 # REMOVED _load_local_and_serve and _handle_get_waypoints def _check_config_service(self): """Check if service is available and call it""" config_ready = self._config_client.service_is_ready() # 配置服务好了吗 place_ready = self._place_count_client.service_is_ready() # 计数服务好了吗 get_count_ready = self._get_place_count_client.service_is_ready() # 查询服务好了吗 if config_ready and place_ready and get_count_ready: self._service_wait_timer.cancel() # 关掉检查计时器 self._service_wait_timer = None req = GetWaypoints.Request() self.get_logger().info("Requesting configuration...") # 请求配置 future = self._config_client.call_async(req) future.add_done_callback(self._on_remote_config_received) else: self.get_logger().info("Waiting for config service...") # 还没好,继续等 def _on_remote_config_received(self, future): """Handle remote config response""" try: resp = future.result() if resp.success: self.get_logger().info("Received remote configuration!") # 拿到配置 try: # Try parsing as JSON first (if C++ sends JSON) self.config_data = json.loads(resp.config_json) # 先按json解析 except json.JSONDecodeError: # Fallback to YAML parsing (if C++ sends raw YAML content) self.get_logger().info("JSON decode failed, attempting YAML load...") self.config_data = yaml.safe_load(resp.config_json) # 解析失败就用yaml self._parse_config_data(self.config_data) # 解析配置内容 self._on_config_ready() # 配置就绪 else: self.get_logger().error(f"Config request failed: {resp.message}") # 请求失败 # Retry? or Fallback? self.get_logger().warn("Retrying in 2 seconds...") self._service_wait_timer = self.create_timer(2.0, self._check_config_service) # 2秒后重试 except Exception as e: self.get_logger().error(f"Service call failed: {e}") # 服务调用出错 def _parse_config_data(self, config): """Parse configuration data (common logic)""" try: # 加载取货点 self.pick_points = [] for point in config.get('pick_points', []): self.pick_points.append({ 'x': point['x'], 'y': point['y'], 'yaw': point['yaw'], 'via_pick': point.get('via_pick', 'default'), 'via_place': point.get('via_place', 'default') }) # 保存所有取货点坐标 # 加载放置点 (ID -> 坐标) self.place_points_map = {} self.place_stack_counts = {} for point in config.get('place_points', []): pid = point.get('id') if pid is not None: self.place_points_map[pid] = (point['x'], point['y'], point['yaw']) # 按ID存放置点 self.place_stack_counts[pid] = 0 # 每个位置初始层数0 # 加载导航参数 nav_config = config.get('navigation', {}) self.partner_place_finish_delay = nav_config.get('partner_place_finish_delay', 2.0) # 伙伴放完等2秒 # 加载途径点 self.via_points_map = {} via_config = config.get('via_points', {}) if via_config: for name, point in via_config.items(): self.via_points_map[name] = ( point.get('x', 0.0), point.get('y', 0.0), point.get('yaw', 0.0) ) # 保存中途点 self.get_logger().info(f"Loaded {len(self.via_points_map)} via points") # 打印加载了几个中途点 # 兼容旧配置 old_via = config.get('via_point') if old_via and 'default' not in self.via_points_map: self.via_points_map['default'] = ( old_via.get('x', 0.0), old_via.get('y', 0.0), old_via.get('yaw', 0.0) ) # 兼容老版本配置 # 加载终点坐标 end_points_config = config.get('end_points') if end_points_config: # 新格式:根据命名空间选择终点 ns_key = self._my_ns if self._my_ns else 'default' end_config = end_points_config.get(ns_key) if not end_config: # 如果没有找到对应的命名空间配置,使用 default self.get_logger().warn(f"No end_point config for namespace '{ns_key}', using 'default'") end_config = end_points_config.get('default', {}) self.end_point = ( end_config.get('x', 0.0), end_config.get('y', 0.0), end_config.get('yaw', 1.57) ) # 保存自己的终点 self.get_logger().info(f"Loaded end_point for '{ns_key}': {self.end_point}") # 打印终点 else: end_config = config.get('end_point', {}) self.end_point = ( end_config.get('x', 0.0), end_config.get('y', 0.0), end_config.get('yaw', 1.57) ) # 老格式终点 self.get_logger().info(f"Loaded single end_point: {self.end_point}") # 加载任务分配配置 partition_config = config.get('task_partition', {}) self.split_index = partition_config.get('split_index', 3) # 任务分割点,前3个给B车,后面给A车 self.partner_detect_timeout = partition_config.get('partner_detect_timeout', 5.0) # 检测伙伴超时时间 self.get_logger().info(f"Task split index: {self.split_index}") self.get_logger().info(f"Partner detect timeout: {self.partner_detect_timeout}s") # 根据取货点数量确定货物组数 self.total_cargo = len(self.pick_points) # 总共有几件货 self.get_logger().info(f"Verified Config: {len(self.pick_points)} picks, {len(self.place_points_map)} places.") # 打印配置统计 except Exception as e: self.get_logger().error(f"Error parsing config: {e}") # 解析出错 self._use_fallback_config() # 使用备用配置 def _use_fallback_config(self): """Use hardcoded fallback config""" self.get_logger().warn("USING FALLBACK CONFIGURATION") # 使用备用配置 self.pick_points = [(1.94, 0.48, -1.57)] # 备用取货点 self.place_points_map = {30: (0.5, 1.91, 1.57)} # 备用放置点 self.end_point = (0.0, 0.0, 1.57) # 备用终点 self.partner_place_finish_delay = 2.0 # 备用延时 self.total_cargo = len(self.pick_points) self.split_index = 3 # Still proceed self._on_config_ready() def _on_config_ready(self): """Called when config is fully loaded and parsed""" self.get_logger().info("Configuration ready.") # 配置加载完成 # 如果没有伙伴配置,直接进入单车模式 if self._partner_ns is None: self._single_robot_mode = True # 单车干活 self.get_logger().info("No partner configured, single robot mode enabled.") else: # 有伙伴配置,持续检测伙伴(不设置超时,直到收到启动信号) self.get_logger().info("Partner namespace configured, will check partner detection on start signal...") # 检查是否有待启动的请求(收到启动信号但配置未加载) if self._pending_start_after_config: self.get_logger().info("Config loaded, triggering competition start...") self._pending_start_after_config = False self._trigger_competition_start() # 配置好了,开始比赛 else: # 等待 competition_start 信号,收到信号后才执行初始移动和机械臂初始化 self.get_logger().info("Configuration ready, waiting for competition_start signal...") def _perform_initial_movement(self): """根据命名空间执行初始移动 - 空命名空间: 以0.4m/s向x轴正方向移动1秒 - robotA: 以0.4m/s向x轴负方向移动1秒 """ if self._my_ns is None: # 空命名空间 - 向x轴正方向移动 self.get_logger().info("=== Initial Movement: Moving forward (X+) at 0.4m/s for 1s ===") self._publish_velocity(0.4, duration=1.0, on_complete=self._on_initial_movement_done) # 向前挪1秒 elif self._my_ns == 'robotA': # robotA - 向x轴负方向移动 self.get_logger().info("=== Initial Movement: Moving backward (X-) at 0.4m/s for 1s ===") self._publish_velocity(-0.4, duration=1.0, on_complete=self._on_initial_movement_done) # 向后挪1秒 else: # 其他命名空间 - 不移动,直接启动机械臂初始化 self.get_logger().info("=== No initial movement for this namespace ===") self._start_arm_initialization() # 直接初始化机械臂 # 设置标志,在机械臂初始化完成后启动流程 self._pending_start_after_arm_init = True def _publish_velocity(self, linear_x: float, duration: float, on_complete=None): """发布速度指令,持续指定时间 Args: linear_x: x轴线速度 (m/s) duration: 持续时间 (秒) on_complete: 完成后的回调函数 (可选) """ # 使用线程来发布速度,避免阻塞 def velocity_thread(): twist = Twist() twist.linear.x = linear_x # 设置前后速度 twist.linear.y = 0.0 # 左右速度0 twist.linear.z = 0.0 # 上下速度0 twist.angular.x = 0.0 # 翻滚角速度0 twist.angular.y = 0.0 # 俯仰角速度0 twist.angular.z = 0.0 # 转向角速度0 # 使用 monotonic 时钟确保精确计时,不受系统时间影响 start_time = time.monotonic() end_time = start_time + duration # 以50Hz频率发布 frequency = 50.0 sleep_interval = 1.0 / frequency while time.monotonic() < end_time: self.cmd_vel_pub.publish(twist) # 发速度指令 time.sleep(sleep_interval) # 发送停止命令 twist.linear.x = 0.0 self.cmd_vel_pub.publish(twist) # 停车 actual_duration = time.monotonic() - start_time self.get_logger().info(f"Initial movement complete (actual: {actual_duration:.3f}s)") # 移动完成 # 线程完成后直接调用回调,确保同步 if on_complete is not None: # 使用一次性定时器将回调调度到 ROS2 执行器线程 # 这避免了从非 ROS 线程直接调用 ROS 函数的问题 self.create_timer(0.01, lambda: self._execute_once(on_complete), callback_group=self._cb_group) thread = threading.Thread(target=velocity_thread, daemon=True) thread.start() # 启动线程发速度 def _execute_once(self, callback): """执行一次性回调的包装器""" # 使用标志确保只执行一次 callback_id = id(callback) flag_name = f'_executed_{callback_id}' if hasattr(self, flag_name) and getattr(self, flag_name): return setattr(self, flag_name, True) callback() # 执行回调 def _on_initial_movement_done(self): """初始移动完成后的回调 - 并行启动机械臂初始化和首次导航""" # 防止重复执行 if hasattr(self, '_initial_movement_done') and self._initial_movement_done: return self._initial_movement_done = True self.get_logger().info("Initial movement complete, starting PARALLEL mode...") # 初始移动完成 # 进入并行启动模式 self._parallel_start_mode = True self._arm_init_done_for_parallel = False self._first_nav_done_for_parallel = False # 1. 计算任务分配(从 _start_flow 提取) self._calculate_task_assignment() # 算自己要搬哪几件货 # 2. 启动机械臂初始化(异步) self._start_arm_initialization() # 初始化机械臂 # 3. 同时导航到第一个取货点 self._start_first_navigation() # 同时导航去取货 def _calculate_task_assignment(self): """计算任务分配(从 _start_flow 提取)""" if self._single_robot_mode: self.start_index = 0 self.end_index = self.total_cargo # 单车搬所有货 self.get_logger().info("=" * 40) self.get_logger().info("SINGLE ROBOT MODE: Running all cargo!") self.get_logger().info(f"Assigned cargo index 0 to {self.end_index - 1}") self.get_logger().info("=" * 40) elif self._my_ns == 'robotA': self.start_index = self.split_index self.end_index = self.total_cargo # A车搬后面的货 self.get_logger().info(f"RobotA assigned cargo index {self.start_index} to {self.end_index - 1}") elif self._my_ns is None: self.start_index = 0 self.end_index = min(self.split_index, self.total_cargo) # B车搬前面的货 self.get_logger().info(f"Default robot assigned cargo index {self.start_index} to {self.end_index - 1}") else: self.start_index = 0 self.end_index = self.total_cargo # 未知车搬全部 self.get_logger().info(f"Unknown namespace mode: assigned all cargo (0 to {self.end_index - 1}") self.cargo_index = self.start_index # 从第一件分配的货开始 def _start_first_navigation(self): """并行模式下启动首次导航到第一个取货点""" if self.cargo_index >= self.end_index: self.get_logger().info("No cargo assigned, skipping first navigation") # 没货要搬 self._first_nav_done_for_parallel = True self._check_parallel_start_complete() return self.get_logger().info(f"") self.get_logger().info(f">>> [PARALLEL] Starting navigation to first pick point (cargo {self.cargo_index + 1}/{self.total_cargo}) <<<") self.get_logger().info(f"") # 重置放置层级信息 self._left_place_level = None self._right_place_level = None # 获取第一个取货点 cargo_info = self.pick_points[self.cargo_index] pick_point = (cargo_info['x'], cargo_info['y'], cargo_info['yaw']) via_pick_name = cargo_info['via_pick'] # 导航到取货点 self._navigate_through_via(State.NAV_TO_PICK, *pick_point, via_names=via_pick_name) # 走中途点去取货 def _check_parallel_start_complete(self): """检查并行启动是否都完成""" if not self._parallel_start_mode: return if self._arm_init_done_for_parallel and self._first_nav_done_for_parallel: self.get_logger().info("=" * 50) self.get_logger().info("[PARALLEL] Both arm init and navigation complete!") # 机械臂+导航都好了 self.get_logger().info("=" * 50) # 退出并行模式 self._parallel_start_mode = False # 检查是否有货物需要处理 if self.cargo_index >= self.end_index: self.get_logger().info("No cargo assigned, going home.") # 没货,回家 self._transition_to(State.HOME) self._send_home_goal() else: # 开始 Scan self._transition_to(State.SCAN) # 切换到扫描状态 self._send_scan_goal() # 开始扫描 # Old _load_waypoints_config is replaced by above methods # ==================== Flow Control ==================== def _start_arm_initialization(self): """首先初始化机械臂""" self.get_logger().info("=== Initializing Arm ===") if not self._start_arm_client.wait_for_server(timeout_sec=10.0): self.get_logger().error("StartArm action server not available!") # 机械臂服务没启动 return goal = StartArm.Goal() self.get_logger().info("Sending StartArm goal...") # 发送机械臂初始化指令 future = self._start_arm_client.send_goal_async( goal, feedback_callback=self._on_start_arm_feedback ) future.add_done_callback(self._on_start_arm_goal_response) def _on_start_arm_goal_response(self, future): """StartArm 目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().error("StartArm goal rejected") # 初始化请求被拒 return self.get_logger().info("StartArm goal accepted") # 请求接受 result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_start_arm_result) def _on_start_arm_feedback(self, feedback_msg): """StartArm 反馈""" fb = feedback_msg.feedback self.get_logger().debug(f"StartArm: {fb.current_step} ({fb.progress:.1%})") # 打印初始化进度 def _on_start_arm_result(self, future): """StartArm 完成""" result = future.result().result if not result.success: self.get_logger().error(f"StartArm failed: {result.message}") # 初始化失败 return self.get_logger().info(f"StartArm succeeded: {result.message}") # 初始化成功 self._arm_initialized = True # 并行模式检查 if self._parallel_start_mode: self.get_logger().info("[PARALLEL] Arm initialization complete") self._arm_init_done_for_parallel = True self._check_parallel_start_complete() elif self._pending_start_after_arm_init: # 原有逻辑(非并行模式) self.get_logger().info("Arm initialization complete, starting competition flow...") self._pending_start_after_arm_init = False self._start_flow() # 开始比赛流程 else: # 如果没有收到启动信号,等待competition_start话题 self.get_logger().info("Arm initialization complete, waiting for competition_start signal...") def _start_flow(self): """启动竞赛流程(非并行模式使用)""" # 计算任务分配 self._calculate_task_assignment() # 分配货物 self.get_logger().info("=== Starting Competition Flow ===") if self.cargo_index < self.end_index: self._start_cargo_cycle() # 开始搬货 else: self.get_logger().info("No cargo assigned, going home.") # 没货,回家 self._transition_to(State.HOME) self._send_home_goal() def _start_cargo_cycle(self): """开始当前货物的搬运循环""" self.get_logger().info(f"") self.get_logger().info(f">>> Starting cargo {self.cargo_index + 1}/{self.total_cargo} <<<") self.get_logger().info(f"") # 重置放置层级信息 self._left_place_level = None self._right_place_level = None # 获取当前货物的取货点 cargo_info = self.pick_points[self.cargo_index] pick_point = (cargo_info['x'], cargo_info['y'], cargo_info['yaw']) via_pick_name = cargo_info['via_pick'] # 导航到取货点 (经过途径点) self._navigate_through_via(State.NAV_TO_PICK, *pick_point, via_names=via_pick_name) # 去取货 def _navigate_through_via(self, target_state, x, y, yaw, via_names='default'): """导航到目标点,支持多个途径点队列 Args: target_state: 最终状态 x, y, yaw: 最终目标点 via_names: 途径点名称 (str) 或 名称列表 (list) """ # 1. 记录最终目标 self.final_state = target_state self.final_goal = (x, y, yaw) self.via_queue = [] # 2. 解析所有途径点并填入队列 names_list = [] if isinstance(via_names, list): names_list = via_names elif via_names: names_list = [via_names] for name in names_list: if not name: continue pt = self.via_points_map.get(name) # 兼容旧逻辑:如果单个名称未找到且不是 'default',尝试 fallback 到 default if not pt and name != 'default' and len(names_list) == 1: pt = self.via_points_map.get('default') if pt: self.via_queue.append(pt) # 把中途点加入队列 elif name != 'default': # default 允许不配置 self.get_logger().warn(f"Via point '{name}' not found") # 3. 开始导航流程 if self.via_queue: next_pt = self.via_queue.pop(0) self.get_logger().info(f"Navigating via sequence. Next via point: {next_pt}") self._transition_to(State.NAV_TO_VIA) # 去中途点 self._send_nav_goal(*next_pt) else: self._transition_to(target_state) # 没中途点,直接去终点 self._send_nav_goal(x, y, yaw) def _transition_to(self, new_state: State): """状态转换""" old_state = self.state self.state = new_state self.get_logger().info( f"[Cargo {self.cargo_index + 1}] State: {old_state.name} -> {new_state.name}" ) # 打印状态变化 def _publish_state(self): """发布当前状态""" msg = String() msg.data = self.state.name self.state_pub.publish(msg) # 发自己当前状态 # ==================== Navigation ==================== def _send_nav_goal(self, x, y, yaw): """发送导航目标""" # 保存当前导航目标用于重试 self._current_nav_goal = (x, y, yaw) if not self._nav_client.wait_for_server(timeout_sec=5.0): self.get_logger().error("NavigateTo action server not available, retrying in 0.5s...") # 导航服务没开 self._schedule_nav_retry() # 0.5秒后重试 return goal = NavigateTo.Goal() goal.pose = PoseStamped() goal.pose.header.stamp = self.get_clock().now().to_msg() goal.pose.header.frame_id = self._map_frame # 地图坐标系 goal.pose.pose.position.x = x # 目标X goal.pose.pose.position.y = y # 目标Y goal.pose.pose.orientation.z = math.sin(yaw / 2.0) # 目标角度 goal.pose.pose.orientation.w = math.cos(yaw / 2.0) self.get_logger().info( f"[{self.state.name}] Navigating to ({x:.2f}, {y:.2f}, yaw={yaw:.2f})" ) # 打印导航目标 future = self._nav_client.send_goal_async(goal) future.add_done_callback(self._on_nav_goal_response) def _schedule_nav_retry(self): """安排 0.5 秒后重试导航""" # 取消之前的重试定时器(如果存在) if self._nav_retry_timer is not None: self._nav_retry_timer.cancel() # 创建一次性定时器,0.5 秒后重试 self._nav_retry_timer = self.create_timer(0.5, self._retry_nav_goal) def _retry_nav_goal(self): """重试当前导航目标""" # 取消定时器(一次性) if self._nav_retry_timer is not None: self._nav_retry_timer.cancel() self._nav_retry_timer = None if self._current_nav_goal is not None: x, y, yaw = self._current_nav_goal self.get_logger().info(f"Retrying navigation to ({x:.2f}, {y:.2f}, yaw={yaw:.2f})...") self._send_nav_goal(x, y, yaw) # 重试导航 else: self.get_logger().error("No navigation goal to retry!") # 没有导航目标 def _on_nav_goal_response(self, future): """导航目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().warn("Navigation goal rejected") # 导航请求被拒 return self.get_logger().info("Navigation goal accepted") # 请求接受 result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_nav_result) def _on_nav_result(self, future): """导航完成 - 触发状态转换""" result = future.result().result if not result.success: self._nav_retry_count += 1 self.get_logger().error(f"Navigation failed (attempt {self._nav_retry_count}), retrying in 0.5s...") # 导航失败 self._schedule_nav_retry() # 重试 return # 导航成功,重置重试计数 self._nav_retry_count = 0 self.get_logger().info("Navigation succeeded") # 导航到达 # 状态转换 if self.state == State.NAV_TO_VIA: # 途径点到达 # 先检查是否是扫描失败后的重试导航 if self._retry_scan_after_via: self.get_logger().info("Reached wait point after scan failure, retrying current cargo") self._retry_scan_after_via = False # 重新开始当前货物的取货流程 self._start_cargo_cycle() # 重新搬这件货 return # 正常流程:继续前往目标 if self.via_queue: # 队列中还有途径点,继续下一个 next_pt = self.via_queue.pop(0) self.get_logger().info(f"Via point reached, moving to next: {next_pt}") self._send_nav_goal(*next_pt) # 去下一个中途点 elif self.final_state and self.final_goal: # 途径点全部走完 # 检查是否需要协调检查(走完途径点后再决定去放置点还是等待点) if self._needs_coordination_check and self.final_state in (State.NAV_TO_PLACE, State.NAV_TO_PLACE_RIGHT): self._needs_coordination_check = False if self._is_partner_placing(): # 伙伴正在放置,去等待点 self.get_logger().warn("Partner is placing (checked after via points), redirecting to wait point") self.final_state = None self.final_goal = None self._go_to_wait_point_directly() # 去等伙伴 return else: self.get_logger().info("Partner not placing (checked after via points), proceeding to place") # 前往最终目标 self.get_logger().info(f"All via points reached, continuing to {self.final_state.name}") self._transition_to(self.final_state) self._send_nav_goal(*self.final_goal) # 去终点 self.final_state = None self.final_goal = None else: self.get_logger().error("Reached via point but no pending state!") # 出错 elif self.state == State.NAV_TO_PICK: # 并行模式检查 if self._parallel_start_mode: self.get_logger().info("[PARALLEL] First navigation to pick point complete") self._first_nav_done_for_parallel = True self._check_parallel_start_complete() else: # 正常模式:直接进入 SCAN self._transition_to(State.SCAN) # 到取货点,开始扫描 self._send_scan_goal() elif self.state == State.NAV_TO_PLACE: # 到达放置点,放下货物 (默认左叉 side=1) self._transition_to(State.PLACE) # 到左边放置点,开始放货 tag_id = self.arm_storage.get(1) # Use service to get level self._request_place_level_and_act(tag_id=tag_id, side=1) # 查放第几层 elif self.state == State.NAV_TO_PLACE_RIGHT: # 到达右侧放置点,放下货物 (右叉 side=2) self._transition_to(State.PLACE_RIGHT) # 到右边放置点,放货 tag_id = self.arm_storage.get(2) # Use service to get level self._request_place_level_and_act(tag_id=tag_id, side=2) elif self.state == State.WAITING_FOR_PARTNER: # 到达等待点,开始等待伙伴完成 self.get_logger().info(f"Arrived at wait point, waiting for partner to finish at tag {self._pending_place_tag_id}") # 启动定时器定期检查伙伴状态 self._start_wait_check_timer() # 定时看伙伴好了没 elif self.state == State.NAV_TO_START: # 到达起点,比赛完成 self._transition_to(State.DONE) # 全部完成 self.get_logger().info("") self.get_logger().info("=" * 50) self.get_logger().info("=== ALL CARGO COMPLETED - RETURNED TO START ===") self.get_logger().info("=== COMPETITION FINISHED ===") self.get_logger().info("=" * 50) # ==================== Scan Action ==================== def _send_scan_goal(self): """发送扫描目标""" if not self._scan_client.wait_for_server(timeout_sec=5.0): self.get_logger().error("Scan action server not available!") # 扫描服务没开 return goal = Scan.Goal() self.get_logger().info(f"[{self.state.name}] Starting scan") # 开始扫描 future = self._scan_client.send_goal_async( goal, feedback_callback=self._on_scan_feedback ) future.add_done_callback(self._on_scan_goal_response) def _on_scan_goal_response(self, future): """扫描目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().warn("Scan goal rejected") # 扫描请求被拒 return result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_scan_result) def _on_scan_feedback(self, feedback_msg): """扫描反馈""" fb = feedback_msg.feedback self.get_logger().debug( f"Scan progress: {fb.scan_progress:.1%}, tags found: {fb.tags_found_count}" ) # 打印扫描进度 def _on_scan_result(self, future): """扫描完成 - 获取最左侧 Tag ID,开始对齐""" result = future.result().result if not result.success or result.leftmost_tag_id < 0: self.get_logger().error("Scan failed or no tags found, navigating to wait point to retry") # 扫描失败 # 根据命名空间选择等待点 # 单车模式下使用 default 点 if self._single_robot_mode: wait_point_name = 'default' elif self._my_ns == 'robotA': wait_point_name = 'wait_point_robotA' elif self._my_ns is None: # 空命名空间 = 原来的 robotB 角色 wait_point_name = 'wait_point_robotB' else: wait_point_name = 'default' via_point = self.via_points_map.get(wait_point_name) if not via_point and wait_point_name != 'default': # 如果没找到特定点,回退到 default via_point = self.via_points_map.get('default') wait_point_name = 'default' if via_point: self.get_logger().info(f"Navigating to {wait_point_name} via point: {via_point}") self._transition_to(State.NAV_TO_VIA) # 设置 pending 状态,在途径点到达后重试当前货物 self.final_state = None # 清空 self.final_goal = None self.via_queue = [] self._send_nav_goal(*via_point) # 去重试点 # 到达等待点后重新扫描当前货物 self._retry_scan_after_via = True else: self.get_logger().warn(f"No {wait_point_name} via point configured, retrying scan directly") # 没有配置等待点,直接重新开始当前货物流程 self._start_cargo_cycle() # 直接重试 return # 识别左右 ID self.left_tag_id = result.leftmost_tag_id # 使用 rightmost_tag_id 作为右侧 ID(即使与左侧相同) self.right_tag_id = result.rightmost_tag_id if hasattr(result, 'rightmost_tag_id') and result.rightmost_tag_id >= 0 else None # 如果左右 ID 相同,仍然记录右臂有货物 # 记录存储状态: 左侧ID->左臂(1), 右侧ID->右臂(2) self.arm_storage[1] = self.left_tag_id self.arm_storage[2] = self.right_tag_id if self.right_tag_id is not None else self.left_tag_id self.target_tag_id = self.left_tag_id self.get_logger().info( f"Scan found: Left={self.left_tag_id}, Right={self.right_tag_id}" ) self.get_logger().info(f"Arm storage updated: {self.arm_storage}") self.get_logger().info( f"Targeting leftmost tag: {self.target_tag_id} at pos {result.leftmost_tag_position:.0f}" ) # 扫描完成,记录货号 # 查询左右货物 ID 的堆放数量,设置高堆放模式标志,然后更新 place count self._query_stack_counts_then_pick() # 查放几层,然后抓取 def _query_stack_counts_then_pick(self): """查询左右货物 ID 的堆放数量,然后发送 Pick 指令""" # 存储待查询的结果 self._pending_stack_counts = {} self._pending_queries = 2 # 需要查询左右两个 ID # 查询左侧货物 ID 堆放数量 left_id = self.arm_storage.get(1) right_id = self.arm_storage.get(2) self.get_logger().info(f"Querying stack counts for left_id={left_id}, right_id={right_id}") if left_id is not None: self._query_single_stack_count(left_id, 'left') # 查左边放几层 else: self._pending_stack_counts['left'] = 0 self._pending_queries -= 1 if right_id is not None: self._query_single_stack_count(right_id, 'right') # 查右边放几层 else: self._pending_stack_counts['right'] = 0 self._pending_queries -= 1 # 如果两个都不需要查询,直接发送 Pick if self._pending_queries <= 0: self._finalize_stack_query_and_pick() def _query_single_stack_count(self, place_id: int, side: str): """查询单个放置位的堆放数量""" req = GetPlaceCount.Request() req.place_id = place_id future = self._get_place_count_client.call_async(req) future.add_done_callback( lambda f: self._on_stack_count_received(f, side) ) def _on_stack_count_received(self, future, side: str): """处理堆放数量查询结果""" try: resp = future.result() if resp.success: count = resp.current_count self.get_logger().info(f"Stack count for {side}: {count}") self._pending_stack_counts[side] = count else: self.get_logger().warn(f"GetPlaceCount failed for {side}, assuming 0") self._pending_stack_counts[side] = 0 except Exception as e: self.get_logger().error(f"GetPlaceCount service call failed: {e}") self._pending_stack_counts[side] = 0 self._pending_queries -= 1 if self._pending_queries <= 0: self._finalize_stack_query_and_pick() def _finalize_stack_query_and_pick(self): """完成堆放数量查询,设置 high_stack 标志,然后更新 place count""" left_count = self._pending_stack_counts.get('left', 0) right_count = self._pending_stack_counts.get('right', 0) left_id = self.arm_storage.get(1) right_id = self.arm_storage.get(2) # 如果堆放数量 >= 3,使用高堆放模式 self._high_stack_left = left_count >= 3 # 如果左右 ID 相同,右臂需要考虑左臂放置后的 count # 因为右臂放置时,左臂已经放完,count 会比查询时多 1 if left_id == right_id and left_id is not None: self._high_stack_right = (right_count + 1) >= 3 self.get_logger().info( f"Same ID detected (ID={left_id}), adjusting right count: {right_count} -> {right_count + 1}" ) else: self._high_stack_right = right_count >= 3 self.get_logger().info( f"Stack counts: left={left_count}, right={right_count}. " f"High stack mode: left={self._high_stack_left}, right={self._high_stack_right}" ) # 打印是否高堆 # 不再直接发送 Pick,而是先更新 place count self._update_place_counts_after_scan() # 更新放货计数 def _update_place_counts_after_scan(self): """在 scan 完成后更新 place count 并获取放置层级""" left_id = self.arm_storage.get(1) right_id = self.arm_storage.get(2) # 存储待更新的结果 self._pending_place_levels = {} self._pending_level_updates = 0 # 更新左 ID 的 place count if left_id is not None: self._update_single_place_count(left_id, 'left') # 左边计数+1 self._pending_level_updates += 1 # 更新右 ID 的 place count # 无论左右 ID 是否相同,都需要更新两次(左一次,右一次) # - 相同 ID:两次更新同一个 ID,计数连续增加(0→1→2) # - 不同 ID:更新不同 ID,各自独立计数 if right_id is not None: self._update_single_place_count(right_id, 'right') # 右边计数+1 self._pending_level_updates += 1 # 如果没有需要更新的,直接继续 if self._pending_level_updates == 0: self._finalize_place_count_update_and_pick() def _update_single_place_count(self, place_id: int, side: str): """更新单个放置位的计数并获取层级""" req = UpdatePlaceCount.Request() req.place_id = place_id self.get_logger().info(f"Updating place count for {side} ID {place_id}...") future = self._place_count_client.call_async(req) future.add_done_callback( lambda f: self._on_place_count_updated(f, side) ) def _on_place_count_updated(self, future, side: str): """处理 place count 更新结果""" try: resp = future.result() if resp.success: level = resp.target_level self.get_logger().info(f"Place count updated for {side}: Level {level}") self._pending_place_levels[side] = level # 记录放置层级 if side == 'left': self._left_place_level = level elif side == 'right': self._right_place_level = level else: self.get_logger().warn(f"UpdatePlaceCount failed for {side}, assuming level 1") self._pending_place_levels[side] = 1 if side == 'left': self._left_place_level = 1 elif side == 'right': self._right_place_level = 1 except Exception as e: self.get_logger().error(f"UpdatePlaceCount service call failed: {e}") self._pending_place_levels[side] = 1 if side == 'left': self._left_place_level = 1 elif side == 'right': self._right_place_level = 1 self._pending_level_updates -= 1 if self._pending_level_updates <= 0: self._finalize_place_count_update_and_pick() def _finalize_place_count_update_and_pick(self): """完成 place count 更新,开始 Pick""" left_level = self._pending_place_levels.get('left', 1) right_level = self._pending_place_levels.get('right', 1) self.get_logger().info( f"Place levels determined: left={left_level}, right={right_level}" ) # 层数确定 # 状态转换: SCAN -> PICK self._transition_to(State.PICK) # 切换到抓取状态 self._send_pick_goal() # 开始抓货 # ==================== Pick Action ==================== def _send_pick_goal(self): """发送抓取目标""" if not self._pick_client.wait_for_server(timeout_sec=5.0): self.get_logger().error("Pick action server not available!") # 抓取服务没开 return goal = Pick.Goal() goal.tag_id = self.target_tag_id if self.target_tag_id else 0 goal.high_stack_left = getattr(self, '_high_stack_left', False) goal.high_stack_right = getattr(self, '_high_stack_right', False) self.get_logger().info( f"[{self.state.name}] Starting pick (high_stack_left={goal.high_stack_left}, " f"high_stack_right={goal.high_stack_right})" ) # 开始抓取 future = self._pick_client.send_goal_async( goal, feedback_callback=self._on_pick_feedback ) future.add_done_callback(self._on_pick_goal_response) def _on_pick_goal_response(self, future): """抓取目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().warn("Pick goal rejected") # 抓取请求被拒 return result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_pick_result) def _on_pick_feedback(self, feedback_msg): """抓取反馈""" fb = feedback_msg.feedback self.get_logger().debug(f"Pick: {fb.current_step} ({fb.progress:.1%})") # 抓取进度 def _on_pick_result(self, future): """抓取完成 - 开始导航到放置点""" result = future.result().result if not result.success: self.get_logger().error(f"Pick failed: {result.message}") # 抓取失败 return self.get_logger().info(f"Pick succeeded: {result.message}") # 抓取完成 # 根据 ID 查找放置点 if self.target_tag_id in self.place_points_map: place_point = self.place_points_map[self.target_tag_id] # 左臂放置,Y方向+6.5cm偏移 adjusted_place_point = (place_point[0], place_point[1] + 0.065, place_point[2]) self.get_logger().info(f"Found place point for ID {self.target_tag_id}, adjusted Y +0.065m for left arm") # 获取当前货物的放置途径点 cargo_info = self.pick_points[self.cargo_index] via_place_name = cargo_info['via_place'] # 使用协调导航到放置点 self._navigate_to_place_with_coordination( tag_id=self.target_tag_id, side=1, place_point=adjusted_place_point, via_names=via_place_name ) # 去放货 else: self.get_logger().error(f"No place point configured for tag ID {self.target_tag_id}!") # 没找到放置点 # 可以在这里添加错误处理逻辑,例如跳过或停止 # ==================== Multi-Robot Coordination ==================== def _navigate_to_place_with_coordination(self, tag_id: int, side: int, place_point: tuple, via_names=None): """协调导航到放置点,避免与另一台车冲突 流程: - 如果同伴在放置点:途经点 -> 等待点 -> 等待完成 -> 放置点 - 如果同伴不在放置点:途经点 -> 放置点 Args: tag_id: 要放置的货物 Tag ID side: 1=左臂, 2=右臂 place_point: 放置点坐标 (x, y, yaw) via_names: 途径点名称 """ # 保存待放置信息 self._pending_place_tag_id = tag_id self._pending_place_side = side self._pending_place_point = place_point self._pending_via_names = via_names self.get_logger().info(f"Checking coordination for placing tag {tag_id} at side {side}") # 检查是否有伙伴机器人订阅 if self._partner_ns is None: # 没有多机器人协调,经过途经点后直接导航到放置点 self.get_logger().info("No multi-robot coordination, navigating via waypoints to place") self._needs_coordination_check = False self._navigate_via_then_place() return # 标记需要在途径点走完后进行协调检查 # 不再立即检查伙伴状态,而是先走途径点,走完后再检查 self._needs_coordination_check = True self.get_logger().info("Will check partner status after via points are traversed") self._navigate_via_then_place() def _navigate_via_then_place(self): """经过途经点后导航到放置点""" place_point = self._pending_place_point side = self._pending_place_side via_names = self._pending_via_names # 确定目标状态 target_state = State.NAV_TO_PLACE if side == 1 else State.NAV_TO_PLACE_RIGHT self.get_logger().info(f"Navigating to place via {via_names}") # 使用途经点导航到放置点 self._navigate_through_via(target_state, *place_point, via_names=via_names) def _navigate_via_then_wait(self): """先经过途经点,再去等待点等待""" via_names = self._pending_via_names # 获取等待点 # 单车模式下使用 default 点 if self._single_robot_mode: wait_point_name = 'default' elif self._my_ns == 'robotA': wait_point_name = 'wait_point_robotA' elif self._my_ns is None: # 空命名空间 = 原来的 robotB 角色 wait_point_name = 'wait_point_robotB' else: wait_point_name = 'default' wait_point = self.via_points_map.get(wait_point_name) if not wait_point and wait_point_name != 'default': wait_point = self.via_points_map.get('default') if not wait_point: # 没有配置等待点,直接在原地等待 self._transition_to(State.WAITING_FOR_PARTNER) self.get_logger().warn("No wait point configured, waiting in place") self._start_wait_check_timer() return # 构建途经点队列:先走 via_names 途经点,再走等待点 # 设置最终目标为等待点,使用 WAITING_FOR_PARTNER 状态 self.final_state = State.WAITING_FOR_PARTNER self.final_goal = wait_point self.via_queue = [] # 解析 via_names 途经点 names_list = [] if isinstance(via_names, list): names_list = via_names elif via_names: names_list = [via_names] for name in names_list: if not name: continue pt = self.via_points_map.get(name) if not pt and name != 'default' and len(names_list) == 1: pt = self.via_points_map.get('default') if pt: self.via_queue.append(pt) elif name != 'default': self.get_logger().warn(f"Via point '{name}' not found") # 开始导航 if self.via_queue: next_pt = self.via_queue.pop(0) self.get_logger().info(f"Navigating via sequence to wait point. First via: {next_pt}") self._transition_to(State.NAV_TO_VIA) self._send_nav_goal(*next_pt) else: # 没有途经点,直接去等待点 self.get_logger().info(f"No via points, navigating directly to wait point: {wait_point}") self._transition_to(State.WAITING_FOR_PARTNER) self._send_nav_goal(*wait_point) def _is_partner_placing(self) -> bool: """检查伙伴是否正在放置或正在前往放置点 Returns: True 如果伙伴正在放置或导航去放置点 """ if not self._partner_state: return False # 检查伙伴是否在放置相关状态(包括导航去放置点) place_states = ['NAV_TO_PLACE', 'NAV_TO_PLACE_RIGHT', 'PLACE', 'PLACE_RIGHT'] return self._partner_state in place_states def _go_to_wait_point_directly(self): """途径点已走完,直接导航到等待点(不再经过途径点)""" # 根据命名空间选择等待点 if self._single_robot_mode: wait_point_name = 'default' elif self._my_ns == 'robotA': wait_point_name = 'wait_point_robotA' elif self._my_ns is None: # 空命名空间 = 原来的 robotB 角色 wait_point_name = 'wait_point_robotB' else: wait_point_name = 'default' wait_point = self.via_points_map.get(wait_point_name) if not wait_point and wait_point_name != 'default': wait_point = self.via_points_map.get('default') if not wait_point: # 没有配置等待点,直接在原地等待 self._transition_to(State.WAITING_FOR_PARTNER) self.get_logger().warn("No wait point configured, waiting in place") self._start_wait_check_timer() return # 直接导航到等待点 self._transition_to(State.WAITING_FOR_PARTNER) self.get_logger().info(f"Navigating directly to wait point: {wait_point}") self._send_nav_goal(*wait_point) def _check_can_proceed_to_place(self): """检查是否可以继续前往放置点(在等待状态时调用)""" if self.state != State.WAITING_FOR_PARTNER: return # 如果已经在等待延迟,直接返回 if self._proceed_timer is not None: return # 检查伙伴是否还在放置 if not self._is_partner_placing(): self.get_logger().info(f"Partner finished placing. Waiting {self.partner_place_finish_delay}s before proceeding...") # 停止等待检查定时器 if self._wait_check_timer is not None: self._wait_check_timer.cancel() self._wait_check_timer = None # 启动延迟定时器 self._proceed_timer = self.create_timer( self.partner_place_finish_delay, self._delayed_proceed_callback ) def _delayed_proceed_callback(self): """延迟结束后继续""" self.get_logger().info("Delay finished, proceeding to place") # 清除定时器 if self._proceed_timer is not None: self._proceed_timer.cancel() self._proceed_timer = None # 继续前往放置点 self._proceed_to_place() def _start_wait_check_timer(self): """启动等待检查定时器""" if self._wait_check_timer is not None: self._wait_check_timer.cancel() # 每 0.5 秒检查一次 self._wait_check_timer = self.create_timer(0.5, self._check_can_proceed_to_place) def _proceed_to_place(self): """等待结束后继续前往放置点(此时已经经过途经点,直接去放置点)""" if self._pending_place_point is None: self.get_logger().error("No pending place point!") return tag_id = self._pending_place_tag_id side = self._pending_place_side place_point = self._pending_place_point self.get_logger().info(f"Proceeding to place tag {tag_id} at {place_point}") # 确定目标状态 target_state = State.NAV_TO_PLACE if side == 1 else State.NAV_TO_PLACE_RIGHT # 等待结束后直接导航到放置点(途经点已经在等待前走过了) self._transition_to(target_state) self._send_nav_goal(*place_point) def _request_place_level_and_act(self, tag_id: int, side: int): """使用已保存的放置层级,直接发送 Place 指令""" if tag_id is None: self.get_logger().error(f"Cannot place None tag_id at side {side}") return # 从已保存的层级中获取 if side == 1: level = self._left_place_level if self._left_place_level is not None else 1 elif side == 2: level = self._right_place_level if self._right_place_level is not None else 1 else: level = 1 self.get_logger().info(f"Using saved place level for side {side}: Level {level}") # 直接发送 Place 指令 self._send_place_goal(side=side, level=level) # ==================== Place Action ==================== def _send_place_goal(self, side: int = 0, level: int = 1): """发送放置目标 Args: side: 0=两边, 1=仅左叉, 2=仅右叉 level: 放置高度等级 (1-4) """ if not self._place_client.wait_for_server(timeout_sec=5.0): self.get_logger().error("Place action server not available!") # 放置服务没开 return goal = Place.Goal() goal.tag_id = 0 goal.side = side goal.level = level side_name = {0: "both", 1: "left", 2: "right"}.get(side, "both") self.get_logger().info(f"[{self.state.name}] Starting place (side={side_name}, level={level})") # 开始放货 future = self._place_client.send_goal_async( goal, feedback_callback=self._on_place_feedback ) future.add_done_callback(self._on_place_goal_response) def _on_place_goal_response(self, future): """放置目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().warn("Place goal rejected") # 放置请求被拒 return result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_place_result) def _on_place_feedback(self, feedback_msg): """放置反馈""" fb = feedback_msg.feedback self.get_logger().debug(f"Place: {fb.current_step} ({fb.progress:.1%})") # 放置进度 def _on_place_result(self, future): """放置完成 - 根据当前状态决定下一步""" result = future.result().result if not result.success: self.get_logger().error(f"Place failed: {result.message}") # 放置失败 return self.get_logger().info(f"Place succeeded: {result.message}") # 放置完成 if self.state == State.PLACE: # 放置完成,清除存储状态 (左臂 side=1) self.arm_storage[1] = None self._pending_place_tag_id = None # 清除待放置 ID self.get_logger().info("Released cargo from LEFT arm") # 检查右臂是否有货物 right_cargo_id = self.arm_storage.get(2) if right_cargo_id is not None: self.get_logger().info(f"Processing RIGHT arm cargo: {right_cargo_id}") if right_cargo_id in self.place_points_map: place_point = self.place_points_map[right_cargo_id] # 右臂放置,Y方向-6.5cm偏移 adjusted_place_point = (place_point[0], place_point[1] - 0.065, place_point[2]) self.get_logger().info(f"Adjusted Y -0.065m for right arm") # 如果左右 ID 相同,先导航到 default via point via_names = 'default' if right_cargo_id == self.left_tag_id else None # 使用协调导航到放置点 self._navigate_to_place_with_coordination( tag_id=right_cargo_id, side=2, place_point=adjusted_place_point, via_names=via_names ) # 放右边的货 return else: self.get_logger().error(f"No place point for right cargo ID {right_cargo_id}") # 如果没有右臂货物或找不到放置点,执行高堆放后动作并继续流程 self._execute_high_stack_post_place() self._check_next_cycle() # 看下一件货 elif self.state == State.PLACE_RIGHT: # 右臂放置完成 self.arm_storage[2] = None self._pending_place_tag_id = None # 清除待放置 ID self.get_logger().info("Released cargo from RIGHT arm") # 执行高堆放后动作(如果需要) self._execute_high_stack_post_place() self._check_next_cycle() # 看下一件货 def _execute_high_stack_post_place(self): """在所有 place 动作完成后,如果有第四层放置,执行高堆放后动作 - 左臂第四层: 升起左侧 Z 轴到准备位置,打开左侧夹爪 - 右臂第四层: 升起右侧 Z 轴到准备位置,打开右侧夹爪 """ high_stack_left = self._left_place_level >= 4 high_stack_right = self._right_place_level >= 4 if not high_stack_left and not high_stack_right: self.get_logger().info("No level 4 placement, opening cover only") # 没有第四层放置,但仍然需要打开 cover thread = threading.Thread( target=self._open_cover_only, daemon=True ) thread.start() # 重置层级记录 self._left_place_level = None self._right_place_level = None return self.get_logger().info( f"Executing high stack post-place action: left={high_stack_left}, right={high_stack_right}" ) # 使用后台线程执行避免阻塞 thread = threading.Thread( target=self._execute_high_stack_post_place_impl, args=(high_stack_left, high_stack_right), daemon=True ) thread.start() # 重置层级记录 self._left_place_level = None self._right_place_level = None def _execute_high_stack_post_place_impl(self, high_stack_left: bool, high_stack_right: bool): """执行高堆放后动作的实际实现(在后台线程中运行)""" # 左侧高堆放后动作 if high_stack_left: z_prep_left = self._high_stack_z_prepare_left z_home_left = self.z_home_left self.get_logger().info(f"High stack post-place: raising left Z to {z_prep_left}") # 发送左侧 Z 轴电机命令 self._send_motor_cmd(self._id_z_left, z_prep_left, self._pick_speed) time.sleep(2.0) # 等待电机移动 self.get_logger().info(f"High stack post-place: raising left Z to {z_prep_left}") # 打开左侧夹爪 self._send_servo_cmd('left_fork', self._high_stack_left_fork_open) time.sleep(1.5) self.get_logger().info(f"opening fork") self._send_motor_cmd(self._id_z_left, z_home_left, self._pick_speed) self.get_logger().info(f"High stack post-place") # 右侧高堆放后动作 if high_stack_right: z_prep_right = self._high_stack_z_prepare_right z_home_right = self.z_home_right self.get_logger().info(f"High stack post-place: raising right Z to {z_prep_right}") # 发送右侧 Z 轴电机命令 self._send_motor_cmd(self._id_z_right, z_prep_right, self._pick_speed) time.sleep(2.0) # 等待电机移动 # 打开右侧夹爪 self._send_servo_cmd('right_fork', self._high_stack_right_fork_open) time.sleep(1.5) self.get_logger().info(f"opening fork") self._send_motor_cmd(self._id_z_right, z_home_right, self._pick_speed) self.get_logger().info(f"High stack post-place") # 打开 cover self.get_logger().info("Opening cover after place") self._send_servo_cmd('cover', self._cover_open_angle) self.get_logger().info("High stack post-place action completed") def _open_cover_only(self): """仅打开 cover(没有第四层放置时使用)""" self.get_logger().info("Opening cover after place") self._send_servo_cmd('cover', self._cover_open_angle) def _send_motor_cmd(self, motor_id: int, target: int, speed: int): """发送电机位置命令""" cmd_dict = { "id": motor_id, "cmd": "pos", "val": target, "speed": speed, "rel": False # 绝对位置 } msg = String() msg.data = json.dumps(cmd_dict) self.motor_pub.publish(msg) # 发电机指令 self.get_logger().info(f"Motor cmd: id={motor_id}, target={target}, speed={speed}") def _send_servo_cmd(self, servo_name: str, angle: float): """发送舵机控制命令(持续发送 1.5 秒)""" servo_id_map = { 'cover': 1.0, 'left_fork': 2.0, 'right_fork': 3.0 } if servo_name not in servo_id_map: self.get_logger().error(f"Unknown servo name: {servo_name}") return servo_id = servo_id_map[servo_name] msg = Float32MultiArray() msg.data = [servo_id, float(angle)] # 持续 1.5 秒,以 60Hz 频率发送 duration = 1.5 frequency = 60.0 num_publishes = int(duration * frequency) sleep_interval = 1.0 / frequency self.get_logger().info(f"Servo cmd: {servo_name} -> {angle}°") for _ in range(num_publishes): self.servo_pub.publish(msg) # 发舵机指令 time.sleep(sleep_interval) def _check_next_cycle(self): """检查是否进入下一轮""" self.get_logger().info( f">>> Cargo {self.cargo_index + 1}/{self.total_cargo} completed <<<" ) # 这件货搬完 # 检查是否还有货物需要处理 self.cargo_index += 1 if self.cargo_index < self.end_index: # 还有货物,开始下一组 self._start_cargo_cycle() # 搬下一件 else: # 所有货物已完成,导航到终点 self._transition_to(State.NAV_TO_START) self.get_logger().info( f"所有货物完成,返回终点 ({self.end_point[0]}, {self.end_point[1]}, yaw={self.end_point[2]})" ) self._send_nav_goal(*self.end_point) # 回起点 # ==================== Home Action ==================== def _send_home_goal(self): """发送归位目标""" if not self._home_client.wait_for_server(timeout_sec=5.0): self.get_logger().error("Home action server not available!") # 归位服务没开 return goal = Home.Goal() self.get_logger().info(f"[{self.state.name}] Starting home") # 机械臂回家 future = self._home_client.send_goal_async( goal, feedback_callback=self._on_home_feedback ) future.add_done_callback(self._on_home_goal_response) def _on_home_goal_response(self, future): """归位目标响应""" goal_handle = future.result() if not goal_handle.accepted: self.get_logger().warn("Home goal rejected") # 归位请求被拒 return result_future = goal_handle.get_result_async() result_future.add_done_callback(self._on_home_result) def _on_home_feedback(self, feedback_msg): """归位反馈""" fb = feedback_msg.feedback self.get_logger().debug(f"Home: {fb.current_step}") # 归位进度 def _on_home_result(self, future): """归位完成 - 流程结束""" result = future.result().result if not result.success: self.get_logger().error(f"Home failed: {result.message}") # 归位失败 return self.get_logger().info(f"Home succeeded: {result.message}") # 归位完成 # 状态转换: HOME -> NAV_TO_START self._transition_to(State.NAV_TO_START) self.get_logger().info( f"返回终点 ({self.end_point[0]}, {self.end_point[1]}, yaw={self.end_point[2]})" ) # 返回起点也经过途径点 self._send_nav_goal(*self.end_point) # 回起点 # ==================== Light Control ==================== def _publish_steady_light(self): """发布常亮灯命令(非阻塞)""" thread = threading.Thread( target=self._publish_steady_light_impl, daemon=True ) thread.start() def _publish_steady_light_impl(self): """发布常亮灯命令的实现""" msg = Int32MultiArray() # 根据命名空间决定颜色:robotA用蓝色,空命名空间用红色 if self._my_ns == 'robotA': # 蓝色常亮: [1, 0, 0, 1] msg.data = [1, 0, 0, 1] self.get_logger().info("[Light] Publishing steady blue light") else: # 红色常亮: [1, 1, 0, 0] msg.data = [1, 1, 0, 0] self.get_logger().info("[Light] Publishing steady red light") # 持续5秒,以40Hz频率发送 duration = 5.0 # 持续时间(秒) frequency = 40.0 # 频率(Hz) num_publishes = int(duration * frequency) sleep_interval = 1.0 / frequency # 每次发布后的休眠时间 for i in range(num_publishes): self.light_pub.publish(msg) # 发灯命令 time.sleep(sleep_interval) self.get_logger().info("[Light] Steady light complete") def main(args=None): rclpy.init(args=args) # 初始化ROS2 node = CompetitionNode() # 创建比赛节点 rclpy.spin(node) # 让节点一直运行 node.destroy_node() # 销毁节点 rclpy.shutdown() # 关闭ROS2 if __name__ == '__main__': main() # 运行主函数 ``` ### /root/car_ws/src/arm_control/arm_control/start_arm_node.py文件解释: ``` """ 机械臂启动初始化节点 功能:开机时自动将机械臂移动到安全的初始位置,**不移动X轴** 同时控制舵机(保护盖、左右夹爪)打开,完成开机自检 """ import rclpy from rclpy.node import Node from rclpy.action import ActionClient from std_msgs.msg import Float32MultiArray from car_interfaces.action import MoveMotor import time import threading class StartArmNode(Node): """ 机械臂启动初始化节点类 核心作用:开机自动复位机械臂(Y/Z轴)、初始化舵机状态,不操作X轴 """ def __init__(self): super().__init__('start_arm_node') # 节点名称:start_arm_node # ===================== 1. 声明并读取电机ID参数(与主控制节点保持一致)===================== self.declare_parameter('motor_id_x', 1) self.declare_parameter('motor_id_y', 2) self.declare_parameter('motor_id_z_left', 3) self.declare_parameter('motor_id_z_right', 4) self.id_x = self.get_parameter('motor_id_x').value # X轴电机ID(本节点不使用) self.id_y = self.get_parameter('motor_id_y').value # Y轴电机ID self.id_z_left = self.get_parameter('motor_id_z_left').value # 左Z轴电机ID self.id_z_right = self.get_parameter('motor_id_z_right').value # 右Z轴电机ID # ===================== 2. 声明并读取电机运动参数 ===================== self.declare_parameter('speed', 900) # 运动速度 self.declare_parameter('acc', 80) # 运动加速度 self.speed = self.get_parameter('speed').value self.acc = self.get_parameter('acc').value # 电机目标位置参数(与限位测试节点参数名统一,保证配置一致性) self.declare_parameter('test_y_max', 7300) self.declare_parameter('test_y_home', 0) self.declare_parameter('test_z_left_max', 8800) self.declare_parameter('test_z_right_max', -8800) self.declare_parameter('test_z_left_home', 0) self.declare_parameter('test_z_right_home', 0) self.declare_parameter('test_z_left_intermediate', -4450) self.declare_parameter('test_z_right_intermediate', 4450) self.test_y_max = self.get_parameter('test_y_max').value self.test_y_home = self.get_parameter('test_y_home').value self.test_z_left_max = self.get_parameter('test_z_left_max').value self.test_z_right_max = self.get_parameter('test_z_right_max').value self.test_z_left_home = self.get_parameter('test_z_left_home').value self.test_z_right_home = self.get_parameter('test_z_right_home').value self.test_z_left_intermediate = self.get_parameter('test_z_left_intermediate').value self.test_z_right_intermediate = self.get_parameter('test_z_right_intermediate').value # ===================== 3. 声明并读取舵机参数 ===================== self.declare_parameter('cover_open_angle', 90.0) # 保护盖打开角度 self.declare_parameter('left_fork_open_angle', 0.0) # 左夹爪打开角度 self.declare_parameter('right_fork_open_angle', 170.0)# 右夹爪打开角度 self.cover_open = self.get_parameter('cover_open_angle').value self.left_fork_open = self.get_parameter('left_fork_open_angle').value self.right_fork_open = self.get_parameter('right_fork_open_angle').value # ===================== 4. ROS2通信接口初始化 ===================== # 动作客户端:用于发送电机绝对位置运动指令 self._move_motor_client = ActionClient(self, MoveMotor, 'move_motor') # 发布者:用于发送舵机角度控制指令 self._servo_pub = self.create_publisher(Float32MultiArray, '/servo_cmd', 10) self.get_logger().info("Start Arm Node started") self.get_logger().info("Waiting for move_motor action server...") # 等待电机动作服务器启动(超时10秒) if not self._move_motor_client.wait_for_server(timeout_sec=10.0): self.get_logger().error("move_motor action server not available!") return self.get_logger().info("Action server connected. Starting arm initialization sequence...") # 启动机械臂初始化流程(核心函数) self.run_start_sequence() def send_move_goal(self, motor_ids, targets, speed: int, acc: int = None) -> bool: """ 发送电机运动目标,并阻塞等待运动完成 :param motor_ids: 单个电机ID 或 电机ID列表 :param targets: 单个目标位置 或 位置列表 :param speed: 运动速度 :param acc: 运动加速度 :return: 运动成功返回True,失败返回False """ if acc is None: acc = self.acc # 统一转换为列表格式,支持单电机/多电机同时控制 if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(targets, list): targets = [targets] self.get_logger().info(f"Moving motors {motor_ids} to {targets} at speed {speed}, acc {acc}...") # 1. 异步发送所有电机的运动目标 goal_futures = [] for motor_id, target in zip(motor_ids, targets): goal = MoveMotor.Goal() goal.motor_id = motor_id goal.target_position = target goal.speed = speed goal.acc = acc goal.relative = False # 绝对位置运动 goal_futures.append(self._move_motor_client.send_goal_async(goal)) # 2. 等待所有目标被服务器接收 goal_handles = [] for i, future in enumerate(goal_futures): rclpy.spin_until_future_complete(self, future, timeout_sec=5.0) goal_handle = future.result() if not goal_handle or not goal_handle.accepted: self.get_logger().error(f"Goal rejected for motor {motor_ids[i]}") return False goal_handles.append(goal_handle) # 3. 等待所有电机运动完成,并获取结果 result_futures = [handle.get_result_async() for handle in goal_handles] all_success = True for i, result_future in enumerate(result_futures): rclpy.spin_until_future_complete(self, result_future, timeout_sec=60.0) result = result_future.result() motor_id = motor_ids[i] if result and result.result.success: self.get_logger().info( f"Motor {motor_id} reached position {result.result.final_position}" ) else: message = result.result.message if result else "Unknown error" self.get_logger().error(f"Motor {motor_id} move failed: {message}") all_success = False return all_success def _control_servo(self, servo_name: str, angle: float): """ 非阻塞式舵机控制(开启独立线程) 防止舵机控制阻塞主线程的电机运动流程 :param servo_name: 舵机名称(cover/left_fork/right_fork) :param angle: 目标角度 """ thread = threading.Thread( target=self._control_servo_impl, args=(servo_name, angle), daemon=True # 守护线程,节点关闭时自动退出 ) thread.start() def _control_servo_impl(self, servo_name: str, angle: float): """ 舵机控制内部实现:持续发送指令1.5秒,保证舵机稳定到达角度 舵机需要持续信号才能保持位置 """ self.get_logger().info(f"[Servo] Starting: {servo_name} -> {angle}°") servo_start = time.time() # 舵机名称与ID映射表 msg = Float32MultiArray() servo_id_map = { 'cover': 1.0, # 保护盖舵机 ID=1 'left_fork': 2.0, # 左夹爪舵机 ID=2 'right_fork': 3.0 # 右夹爪舵机 ID=3 } if servo_name not in servo_id_map: self.get_logger().error(f"Unknown servo name: {servo_name}") return servo_id = servo_id_map[servo_name] msg.data = [servo_id, float(angle)] # 持续1.5秒,60Hz频率发送指令 duration = 1.5 frequency = 60.0 num_publishes = int(duration * frequency) sleep_interval = 1.0 / frequency for i in range(num_publishes): self._servo_pub.publish(msg) time.sleep(sleep_interval) elapsed = time.time() - servo_start self.get_logger().info(f"[Servo] Complete: {servo_name} -> {angle}° (took {elapsed:.2f}s)") def run_start_sequence(self): """ 【核心流程】机械臂初始化序列 步骤:打开保护盖 → 分步移动Z/Y轴 → 打开左右夹爪 → 回到安全初始位置 全程不移动X轴 """ self.get_logger().info("=" * 50) self.get_logger().info("Starting Arm Initialization Sequence") self.get_logger().info("=" * 50) # 第一步:初始化打开保护盖(安全操作) self.get_logger().info("Opening cover at the start of sequence...") self._control_servo('cover', self.cover_open) time.sleep(1.5) # 等待舵机动作完成 # 定义电机运动步骤(不包含X轴) movement_steps = [ ("Z axis left to intermediate", self.id_z_left, self.test_z_left_intermediate), ("Z axis right to intermediate", self.id_z_right, self.test_z_right_intermediate), ("Z axis right to home", self.id_z_right, self.test_z_right_home), ("Y axis to home", self.id_y, self.test_y_home), ] results = [] for step_name, motor_id, target in movement_steps: self.get_logger().info("-" * 40) self.get_logger().info(f"Step: {step_name}") # 执行电机运动 success = self.send_move_goal(motor_id, target, self.speed) results.append((step_name, success)) # 打印步骤执行结果 if success: self.get_logger().info(f"✓ {step_name} - SUCCESS") else: self.get_logger().warn(f"✗ {step_name} - FAILED") # ===================== 特殊逻辑:Z轴到达中间位置后,打开对应夹爪 ===================== # 左Z轴到达中间位置 → 打开左夹爪 → 左Z轴回原点 if step_name == "Z axis left to intermediate" and success: self.get_logger().info("Left Z axis reached intermediate position, opening left fork...") self._control_servo('left_fork', self.left_fork_open) time.sleep(1.5) # 移动左Z轴到初始位置 self.get_logger().info("Moving left Z axis to home position...") left_home_success = self.send_move_goal(self.id_z_left, self.test_z_left_home, self.speed) if left_home_success: self.get_logger().info("✓ Z axis left to home - SUCCESS") results.append(("Z axis left to home", True)) else: self.get_logger().warn("✗ Z axis left to home - FAILED") results.append(("Z axis left to home", False)) # 右Z轴到达中间位置 → 打开右夹爪 elif step_name == "Z axis right to intermediate" and success: self.get_logger().info("Right Z axis reached intermediate position, opening right fork...") self._control_servo('right_fork', self.right_fork_open) time.sleep(1.5) # ===================== 初始化流程总结 ===================== self.get_logger().info("=" * 50) self.get_logger().info("Initialization Summary") self.get_logger().info("=" * 50) passed = sum(1 for _, s in results if s) total = len(results) # 打印所有步骤结果 for step_name, success in results: status = "SUCCESS" if success else "FAILED" self.get_logger().info(f" {step_name}: {status}") self.get_logger().info(f"\nTotal: {passed}/{total} steps completed successfully") if passed == total: self.get_logger().info("Arm initialization completed successfully!") else: self.get_logger().warn(f"{total - passed} step(s) failed") def main(args=None): """ROS2节点入口函数""" rclpy.init(args=args) node = StartArmNode() # 短暂保持节点运行,保证日志输出完成 try: rclpy.spin_once(node, timeout_sec=2.0) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() ``` ### /root/car_ws/src/arm_control/arm_control/test_joint_limits.py 文件解释: ``` """ Test Joint Limits Node 测试关节位置限制的节点,按顺序测试各轴能否达到最大限位 """ # 导入ROS2 Python核心库 import rclpy # 导入ROS2节点基础类,所有节点都要继承它 from rclpy.node import Node # 导入动作客户端,用来控制长时间运动的电机 from rclpy.action import ActionClient # 浮点数组消息类型,用来发送舵机角度指令 from std_msgs.msg import Float32MultiArray # 你自己定义的电机运动动作接口 from car_interfaces.action import MoveMotor # 时间库,用来做延时等待 import time # 多线程库,让舵机运动不卡住电机主线程 import threading # 定义限位测试节点类,继承ROS2节点 class TestJointLimitsNode(Node): """测试关节限位的节点""" # 节点初始化函数,节点一运行就执行这里 def __init__(self): # 初始化ROS2节点,命名:test_joint_limits_node super().__init__('test_joint_limits_node') # ==================== 声明电机ID参数,和主动作节点保持一致 ==================== # X水平左右轴电机ID self.declare_parameter('motor_id_x', 1) # Y前后伸缩轴电机ID self.declare_parameter('motor_id_y', 2) # 左侧Z升降轴电机ID self.declare_parameter('motor_id_z_left', 3) # 右侧Z升降轴电机ID self.declare_parameter('motor_id_z_right', 4) # 读取参数,赋值给变量方便使用 self.id_x = self.get_parameter('motor_id_x').value self.id_y = self.get_parameter('motor_id_y').value self.id_z_left = self.get_parameter('motor_id_z_left').value self.id_z_right = self.get_parameter('motor_id_z_right').value # ==================== 声明测试运动速度、加速度 ==================== # 测试速度,用低速更安全,防止撞机 self.declare_parameter('speed', 500) # 电机加减速快慢 self.declare_parameter('acc', 80) self.speed = self.get_parameter('speed').value self.acc = self.get_parameter('acc').value # ==================== 各轴限位测试位置 ==================== self.declare_parameter('test_x_min', -3500) # X轴向左最大极限位置 self.declare_parameter('test_x_max', 3500) # X轴向右最大极限位置 self.declare_parameter('test_x_home', 0) # X轴原点中间位置 self.declare_parameter('test_y_max', 7300) # Y轴向前最大伸出位置 self.declare_parameter('test_y_home', 0) # Y轴缩回原点 self.declare_parameter('test_z_left_max', 8800) # 左Z轴最大下降行程 self.declare_parameter('test_z_right_max', -8800)# 右Z轴最大下降行程 self.declare_parameter('test_z_left_home', 0) # 左Z轴原点 self.declare_parameter('test_z_right_home', 0)# 右Z轴原点 self.declare_parameter('test_z_left_intermediate', -4450) # 左Z安全中间位置 self.declare_parameter('test_z_right_intermediate', 4450) # 右Z安全中间位置 # 读取所有限位位置参数 self.test_x_min = self.get_parameter('test_x_min').value self.test_x_max = self.get_parameter('test_x_max').value self.test_x_home = self.get_parameter('test_x_home').value self.test_y_max = self.get_parameter('test_y_max').value self.test_y_home = self.get_parameter('test_y_home').value self.test_z_left_max = self.get_parameter('test_z_left_max').value self.test_z_right_max = self.get_parameter('test_z_right_max').value self.test_z_left_home = self.get_parameter('test_z_left_home').value self.test_z_right_home = self.get_parameter('test_z_right_home').value self.test_z_left_intermediate = self.get_parameter('test_z_left_intermediate').value self.test_z_right_intermediate = self.get_parameter('test_z_right_intermediate').value # ==================== 舵机(保护盖、夹爪)角度参数 ==================== self.declare_parameter('cover_open_angle', 90.0) # 保护盖打开角度 self.declare_parameter('left_fork_open_angle', 0.0) # 左夹爪打开角度 self.declare_parameter('right_fork_open_angle', 170.0)# 右夹爪打开角度 self.cover_open = self.get_parameter('cover_open_angle').value self.left_fork_open = self.get_parameter('left_fork_open_angle').value self.right_fork_open = self.get_parameter('right_fork_open_angle').value # ==================== ROS2通信接口 ==================== # 创建电机动作客户端,用来下发移动指令 self._move_motor_client = ActionClient(self, MoveMotor, 'move_motor') # 创建舵机话题发布者,发送夹爪、盖子角度 self._servo_pub = self.create_publisher(Float32MultiArray, '/servo_cmd', 10) # 打印日志:节点已启动 self.get_logger().info("Test Joint Limits Node started") self.get_logger().info("Waiting for move_motor action server...") # 等待电机控制服务上线,最多等10秒 if not self._move_motor_client.wait_for_server(timeout_sec=10.0): # 没等到服务,报错退出 self.get_logger().error("move_motor action server not available!") return self.get_logger().info("Action server connected. Starting test sequence...") # 执行全套限位测试流程 self.run_test_sequence() # ==================== 通用函数:发送电机移动指令,等待走完 ==================== def send_move_goal(self, motor_ids, targets, speed: int, acc: int = None) -> bool: """Send move motor goals and wait for results""" # 如果没传加速度,就用默认加速度 if acc is None: acc = self.acc # 统一转列表,支持单个电机 / 多个电机一起动 if not isinstance(motor_ids, list): motor_ids = [motor_ids] if not isinstance(targets, list): targets = [targets] # 打印当前要移动哪个电机、去哪里 self.get_logger().info(f"Moving motors {motor_ids} to {targets} at speed {speed}, acc {acc}...") goal_futures = [] # 循环给每个电机下发移动任务 for motor_id, target in zip(motor_ids, targets): goal = MoveMotor.Goal() goal.motor_id = motor_id # 电机编号 goal.target_position = target # 目标位置 goal.speed = speed # 速度 goal.acc = acc # 加速度 goal.relative = False # 绝对位置移动,不是相对偏移 # 异步发送任务 goal_futures.append(self._move_motor_client.send_goal_async(goal)) # 等待所有任务被电机服务器接收 goal_handles = [] for i, future in enumerate(goal_futures): rclpy.spin_until_future_complete(self, future, timeout_sec=5.0) goal_handle = future.result() # 如果任务被拒绝,报错 if not goal_handle or not goal_handle.accepted: self.get_logger().error(f"Goal rejected for motor {motor_ids[i]}") return False goal_handles.append(goal_handle) # 等待所有电机运动完成 result_futures = [handle.get_result_async() for handle in goal_handles] all_success = True for i, result_future in enumerate(result_futures): # 单电机最长等待60秒 rclpy.spin_until_future_complete(self, result_future, timeout_sec=60.0) result = result_future.result() motor_id = motor_ids[i] # 运动成功 if result and result.result.success: self.get_logger().info( f"Motor {motor_id} reached position {result.result.final_position}" ) # 运动失败 else: message = result.result.message if result else "Unknown error" self.get_logger().error(f"Motor {motor_id} move failed: {message}") all_success = False # 返回本次移动全部是否成功 return all_success # ==================== 舵机控制函数(新开线程,不卡住电机) ==================== def _control_servo(self, servo_name: str, angle: float): """Control servo motor (non-blocking) Sends servo control commands continuously for 1 second at 30Hz in a background thread. Args: servo_name: 'cover', 'left_fork', or 'right_fork' angle: Target angle in degrees """ # 开启后台线程控制舵机 thread = threading.Thread( target=self._control_servo_impl, args=(servo_name, angle), daemon=True ) thread.start() # 舵机底层实际执行逻辑 def _control_servo_impl(self, servo_name: str, angle: float): """Internal implementation of servo control""" self.get_logger().info(f"[Servo] Starting: {servo_name} -> {angle}°") servo_start = time.time() msg = Float32MultiArray() # 舵机名字对应硬件ID servo_id_map = { 'cover': 1.0, # 保护盖ID1 'left_fork': 2.0, # 左夹爪ID2 'right_fork': 3.0 # 右夹爪ID3 } # 找不到舵机名字直接报错 if servo_name not in servo_id_map: self.get_logger().error(f"Unknown servo name: {servo_name}") return servo_id = servo_id_map[servo_name] msg.data = [servo_id, float(angle)] # 持续1.5秒,每秒60次发送指令,保证舵机稳稳到位 duration = 1.5 frequency = 60.0 num_publishes = int(duration * frequency) sleep_interval = 1.0 / frequency for i in range(num_publishes): self._servo_pub.publish(msg) time.sleep(sleep_interval) elapsed = time.time() - servo_start self.get_logger().info(f"[Servo] Complete: {servo_name} -> {angle}° (took {elapsed:.2f}s)") # ==================== 核心:全套限位测试步骤顺序 ==================== def run_test_sequence(self): """Run the full test sequence""" self.get_logger().info("=" * 50) self.get_logger().info("Starting Joint Limits Test Sequence") self.get_logger().info("=" * 50) # 第一步:先打开保护盖,安全操作 self.get_logger().info("Opening cover at the start of sequence...") self._control_servo('cover', self.cover_open) time.sleep(1.5) # 等待盖子打开完成 # 按顺序定义所有测试步骤 test_steps = [ ("X轴走到最小限位", self.id_x, self.test_x_min), ("X轴走到最大限位", self.id_x, self.test_x_max), ("X轴回到原点", self.id_x, self.test_x_home), # 左右Z轴同时走到最大行程 ("双Z轴走到最大行程", [self.id_z_left, self.id_z_right], [self.test_z_left_max, self.test_z_right_max]), ("Y轴走到最大伸出", self.id_y, self.test_y_max), ("Y轴回到原点", self.id_y, self.test_y_home), ("左Z轴走到中间安全位", self.id_z_left, self.test_z_left_intermediate), ("右Z轴走到中间安全位", self.id_z_right, self.test_z_right_intermediate), ("右Z轴回到原点", self.id_z_right, self.test_z_right_home), ] results = [] # 循环依次执行每一步测试 for step_name, motor_id, target in test_steps: self.get_logger().info("-" * 40) self.get_logger().info(f"Test: {step_name}") # 执行电机移动 success = self.send_move_goal(motor_id, target, self.speed) results.append((step_name, success)) # 打印通过/失败 if success: self.get_logger().info(f"✓ {step_name} - PASSED") else: self.get_logger().warn(f"✗ {step_name} - FAILED") # 左Z到中间位置 → 打开左夹爪 → 再回原点 if step_name == "左Z轴走到中间安全位" and success: self.get_logger().info("左Z到位,打开左夹爪...") self._control_servo('left_fork', self.left_fork_open) time.sleep(1.5) # 左Z回原点 self.get_logger().info("左Z轴回到原点") left_home_success = self.send_move_goal(self.id_z_left, self.test_z_left_home, self.speed) if left_home_success: self.get_logger().info("✓ 左Z回原点 - PASSED") results.append(("Z axis left to home", True)) else: self.get_logger().warn("✗ 左Z回原点 - FAILED") results.append(("Z axis left to home", False)) # 右Z到中间位置 → 打开右夹爪 elif step_name == "右Z轴走到中间安全位" and success: self.get_logger().info("右Z到位,打开右夹爪...") self._control_servo('right_fork', self.right_fork_open) time.sleep(1.5) # ==================== 最后汇总所有测试结果 ==================== self.get_logger().info("=" * 50) self.get_logger().info("测试结果汇总") self.get_logger().info("=" * 50) # 统计通过步数 passed = sum(1 for _, s in results if s) total = len(results) # 逐条打印每一项是否合格 for step_name, success in results: status = "PASSED" if success else "FAILED" self.get_logger().info(f" {step_name}: {status}") self.get_logger().info(f"\n总计: {passed}/{total} 项测试通过") if passed == total: self.get_logger().info("所有关节限位测试全部合格!") else: self.get_logger().warn(f"有 {total - passed} 项测试失败,请检查机械与限位参数") # ROS2程序入口主函数 def main(args=None): # 初始化ROS2环境 rclpy.init(args=args) # 创建节点对象 node = TestJointLimitsNode() try: # 短暂运行节点,保证日志打印完整 rclpy.spin_once(node, timeout_sec=2.0) # Ctrl+C正常退出 except KeyboardInterrupt: pass # 收尾清理 finally: node.destroy_node() rclpy.shutdown() # 直接运行此文件时启动main if __name__ == '__main__': main() ``` ### /root/car\_ws/src/arm\_control/launch/start\_arm.launch.py文件解释 ``` """ Start Arm Launch File 启动机械臂初始化节点(不移动X轴) """ # 导入ROS2启动文件的核心类,用来编写一键启动脚本 from launch import LaunchDescription # 导入延时动作、声明启动参数的工具 from launch.actions import TimerAction, DeclareLaunchArgument # 导入读取启动参数的工具 from launch.substitutions import LaunchConfiguration # 导入启动ROS2节点的工具 from launch_ros.actions import Node # 导入获取ROS2功能包路径的工具 from ament_index_python.packages import get_package_share_directory # 导入系统文件路径工具 import os # 导入读取yaml配置文件的工具 import yaml # 定义一个函数:从配置文件里加载命名空间 def load_namespace_from_config(): """从 robot_config.yaml 加载 namespace 配置""" try: # 找到 car_bringup 功能包的共享文件夹路径 car_bringup_dir = get_package_share_directory('car_bringup') # 拼接出配置文件的完整路径:config/robot_config.yaml config_file = os.path.join(car_bringup_dir, 'config', 'robot_config.yaml') # 打开并读取这个yaml配置文件 with open(config_file, 'r') as f: config = yaml.safe_load(f) # 从配置里读取 namespace(命名空间),没有就为空 return config.get('namespace', '') or '' # 如果读取失败(文件不存在/报错),就返回空 except Exception: return '' # 这是launch文件的核心函数:生成启动指令列表 def generate_launch_description(): # 获取 arm_control 功能包的共享文件夹路径 pkg_share = get_package_share_directory('arm_control') # 拼接出机械臂配置文件的路径:arm_control/config/arm_config.yaml config_file = os.path.join(pkg_share, 'config', 'arm_config.yaml') # 调用上面的函数,从配置里读取默认的命名空间 default_namespace = load_namespace_from_config() # 定义一个变量:读取启动时传入的 namespace 参数 robot_namespace = LaunchConfiguration('namespace') # ========== 定义第一个要启动的节点:电机动作服务节点 ========== # 这个节点是核心!负责接收指令、控制电机运动,必须先启动 arm_action_node = Node( package='arm_control', # 节点所在的功能包 executable='arm_action_node', # 要运行的可执行文件 name='arm_action_node', # 节点名字 namespace=robot_namespace, # 挂载命名空间 output='screen', # 日志打印到终端屏幕 parameters=[config_file], # 加载配置文件里的参数 ) # ========== 定义第二个要启动的节点:机械臂初始化节点 ========== # 这个节点负责开机复位机械臂、打开夹爪/盖子 start_arm_node = Node( package='arm_control', executable='start_arm', name='start_arm_node', namespace=robot_namespace, output='screen', # 加载配置文件 + 额外设置速度参数500 parameters=[config_file, { 'speed': 500, }], ) # 返回最终的启动脚本(所有要执行的动作放在这里) return LaunchDescription([ # 声明一个启动参数:namespace DeclareLaunchArgument( 'namespace', # 参数名 default_value=default_namespace, # 默认值(从配置文件读) description='Namespace for all nodes (default from robot_config.yaml)' # 参数说明 ), # 第一步:立即启动电机控制服务节点 arm_action_node, # 第二步:延时3秒再启动机械臂初始化节点 # 延时原因:等电机服务节点完全启动,否则初始化节点会连不上电机 TimerAction( period=3.0, # 延时3秒 actions=[start_arm_node], # 延时后要做的事:启动初始化节点 ), ]) ```
admin
2026年4月19日 17:21
转发
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
Word文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码
有效期
AI