ROS2自动驾驶
yolo5自动驾驶
1、重要!更换U盘的操作指引
2、关闭开机自启动大程序
3、Linux基础
4、YoloV5训练集
5、自动驾驶基础调试(代码解析)
6、自动驾驶特调
7、自动驾驶原理
8、PID算法理论
9、阿克曼运动学分析理论
10、建立运动学模型
常用命令
!重要!首次使用
一、原理分析
麦克纳姆轮运动学分析
二、AI大模型
3、AI大模型类型和原理
4、RAG检索增强和模型训练样本
5、具身智能机器人系统架构
6、具身智能玩法核心源码解读
7、配置AI大模型
8、配置API-KEY
三、深度相机
2、颜色标定
10、深度相机的基础使用
11、深度相机伪彩色图像
12、深度相机测距
13、深度相机色块体积测算
14、深度相机颜色跟随
15、深度相机人脸跟随
16、深度相机KCF物体跟随
17、深度相机Mediapipe手势跟随
18、深度相机视觉循迹自动驾驶
19、深度相机边缘检测
四、多模态视觉理解
20、多模态语义理解、指令遵循
21、多模态视觉理解
22、多模态视觉理解+自动追踪
23、多模态视觉理解+视觉跟随
24、多模态视觉理解+视觉巡线
25、多模态视觉理解+深度相机距离问答
26、多模态视觉理解+SLAM导航
27、多模态视觉理解+SLAM导航+视觉巡线
28、意图揣测+多模态视觉理解+SLAM导航+视觉功能
五、雷达
8、雷达基础使用
思岚系列雷达
六、建立地图
9、Gmapping建图
cartographer快速重定位导航
RTAB-Map导航
RTAB-Map建图
slam-toolbox建图
cartographer建图
Navigation2多点导航避障
Navigation2单点导航避障
手机APP建图与导航
七、新机器人自动驾驶与调整
多模态视觉理解+SLAM导航
新机器人自动驾驶
场地摆放及注意事项
启动测试
识别调试
无人驾驶的车道保持
无人驾驶路标检测
无人驾驶红绿灯识别
无人驾驶之定点停车
无人驾驶转向决策
无人驾驶之喇叭鸣笛
无人驾驶减速慢行
无人驾驶限速行驶
无人驾驶自主泊车
无人驾驶综合应用
无人驾驶融合AI大模型应用
八、路网规划
路网规划导航简介
构建位姿地图
路网标注
路网规划结合沙盘地图案例
路径重规划
九、模型训练
1、数据采集
2、数据集标注
3、YOLOv11模型训练
4、模型格式转换
十、YOLOV11开发
多机通讯配置
汝城县职业中等专业学校知识库-信息中心朱老师编辑
-
+
首页
二、AI大模型
6、具身智能玩法核心源码解读
6、具身智能玩法核心源码解读
## 具身智能玩法核心源码解读 ### 1.课程内容 * 对multi_brains功能包架构的核心功能进行解析 * 在后续课程中,会对每节教程新出现的动作函数进行单独讲解。 ### 2.源码功能包结构 #### 2.1 功能包文件结构 ``` ├── config │ ├── map_mapping.yaml │ ├── multi_brains_setting.yaml │ └── README.MD ├── language │ ├── en.yaml │ └── zh.yaml ├── launch │ └── llm_agent_control.launch.py ├── multi_brains │ ├── action_service.py │ ├── asr_detect.py │ ├── __init__.py │ ├── model_service.py │ └── utils ├── package.xml ├── resource │ └── multi_brains ├── setup.cfg ├── setup.py ├── system_vioce │ ├── en │ ├── notify.mp3 │ ├── test_en.wav │ ├── test_zh.wav │ └── zh └── test ``` * config 配置文件夹,存放配置**文件模板** * multi_brains源码文件夹 * asr_detect.py 语音识别程序文件 * model_service.py 模型服务器程序文件,用于调用各种模型接口实现模型推理架构 * action_service.py 动作服务器程序文件,用于接收模型服务器请求的动作列表,控制机器人进行运动 * launch启动文件文件夹,用于存放ros2节点启动文件 * language语言包 存放不同语言日志文件 * system_vioce 系统音频文件 ## 3.语音识别功能 源码路径:`~/yahboomcar_ros2_ws/yahboomcar_ws/src/multi_brains/multi_brains/asr_detect.py` ### 3.1 检测语音活动动态录音 1. 持续读取音频帧并进行语音活动检测。 2. 若检测到语音开始,则将音频帧加入缓冲区;若检测到持续静音超过阈值(90帧,约1.5s),则结束录音。 3. 录音结束后,去除尾部静音部分,并将有效语音保存为 WAV 文件。 4. 若未检测到有效语音,则不保存文件。 ``` def listen_for_speech(self): '''VAD动态录音 Dynamic recording with VAD''' self.record_flag = True PRE_SPEECH_FRAMES = 5 # 150ms 语音起始补偿 PRINT_EVERY_N_FRAMES = 5 recording_active = False silence_counter = 0 print_counter = 0 audio_buffer = [] pre_speech_buffer = deque(maxlen=PRE_SPEECH_FRAMES) stream = self.pyaudio.open( format=pyaudio.paInt16, channels=1, rate=self.sample_rate, input=True, frames_per_buffer=self.frame_bytes, ) self.play_audio(self.notify_audio) try: while not self.stop_event.is_set(): frame = stream.read( self.frame_bytes, exception_on_overflow=False) is_speech = self.vad.is_speech(frame, self.sample_rate) print_counter += 1 if print_counter >= PRINT_EVERY_N_FRAMES: print("1-1-1" if is_speech else "-----") print_counter = 0 if not recording_active: # ---- IDLE ---- pre_speech_buffer.append(frame) if is_speech: # RECORDING recording_active = True audio_buffer.extend(pre_speech_buffer) pre_speech_buffer.clear() audio_buffer.append(frame) silence_counter = 0 else: # ---- RECORDING ---- audio_buffer.append(frame) if is_speech: silence_counter = max(0, silence_counter - 1) else: silence_counter += 1 if silence_counter >= self.MAX_SILENCE_FRAMES: break finally: stream.stop_stream() stream.close() self.record_flag = False if recording_active and audio_buffer: with wave.open(self.user_speech_dir, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(self.pyaudio.get_sample_size(pyaudio.paInt16) ) wf.setframerate(self.sample_rate) wf.writeframes(b"".join(audio_buffer)) return True return False ``` ### 3.3 ASR语音识别 * 调用语音引擎的recognize方法进行语音识别,语音引擎由`Tongyi_ASR,SenseVoiceSmall_ASR,XUNFEI_ASR`几个类的实例化对象提供 ``` def kws_handler(self) -> None: '''唤醒处理函数 / Wake-up handling function''' if self.stop_event.is_set(): self.logger.info("Wake-up processing thread interrupted no handle user speech.!!!!!") return if self.listen_for_speech(): asr_result=self.asr_engine.recognize(self.user_speech_dir) # 进行 ASR 转换 / Perform ASR conversion if not asr_result[0]: self.logger.error(Fore.RED+f"Speech recognition failed because the audio segment is empty or the speech model is unavailable.ASR_OUT:{asr_result[1]}"+Fore.RESET) else: if len(asr_result[1]) > self.asr_threashold: self.logger.info(Fore.GREEN+"ASR Result: "+asr_result[1]+Fore.RESET) self.asr_result_queue.put([asr_result[1],'text_request',False])# 将语音识别结果放入队列中 / Put the ASR result into the queue else: self.logger.info(Fore.YELLOW+"The voice recognition result is too short. Could it be that the user woke it up by mistake "+asr_result[1]+Fore.RESET) ``` ## 4.模型服务功能 源码路径: `~/yahboomcar_ros2_ws/yahboomcar_ws/src/multi_brains/multi_brains/model_service.py` 核心函数解读: * 在独立线程中检查`llm_handler_queue`队列中是否存在访问模型的请求 * 一旦有元素被压入队列,根据请求类型掉使用不同的参数调用`Dify`的访问接口`chat`函数 * 当获取到Dfiy端的AI智能体回复后,解析对应的内容进行语音播放和发送动作列表到动作服务器进行执行 ``` def handle_llm_thread(self)->None: ''' 处理模型请求/ Handle model request ''' while True: if not self.llm_handler_queue.empty():#队列不为空,处理模型请求 if not self.text_chat_mode and self.asr_detect.record_flag : continue request_query = self.llm_handler_queue.get() if self.debug_mode: self.get_logger().info(f"Processing LLM request: {request_query}") if request_query[1]=='text_request': '''text request''' result=self.dify_llmclient.chat(request_query[0],robot_feedback=request_query[2]) elif request_query[1]=='image_request': '''vision + text request''' result=self.dify_llmclient.chat(request_query[0],image_path=self.image_cache_path,robot_feedback=request_query[2]) if result[0]: if not self.text_chat_mode and self.asr_detect.record_flag : continue split_result=self.extract_actions(result[1]) if split_result is None: continue action_list,llm_response,decision_plan=self.extract_actions(result[1]) if decision_plan is not None: self.get_logger().info(Fore.YELLOW+self.syslog.get_text("system_log_3",decision_plan=decision_plan)+Fore.RESET) self.get_logger().info(Fore.YELLOW+f'"action": {action_list},"response": {llm_response}'+Fore.RESET) if not self.text_chat_mode:#语音回复 if self.tts_engine.synthesize(llm_response,self.tts_out_path) : self.play_audio(self.tts_out_path) else: self.get_logger().error(Fore.RED+"Speech synthesis failed. Check whether the TTS model is available"+Fore.RESET) else:#文本回复 if decision_plan is not None: self.text_pub.publish(String(data=self.syslog.get_text("system_log_3",decision_plan=decision_plan))) self.text_pub.publish(String(data=f'"action": {action_list}, "response": {llm_response}')) if action_list!=[]: self.send_action_service(action_list, llm_response) else: self.get_logger().error(Fore.RED+f"The model request failed. Check whether the dify or AI model is normal.\ Error Log:{result[1]}"+Fore.RESET) else: time.sleep(1.0)#无请求时休眠1秒 ``` ## 5. 动作服务器功能 * 包含了机器人所有能执行的基础动作函数的实现,通过接收动作列表请求,并执行对应动作。具备再次唤醒后打断的功能 核心回调函数`execute_callback`解析: * 接受动作列表的字符串 * ``` def execute_callback(self, goal_handle): """动作执行回调函数 action execution callback function""" actions = goal_handle.request.actions feedback_result = None if self.debug_mode: self.get_logger().info(self.actionlog.get_text("debug_log_1",actions=actions)) self.action_runing = True for action in actions: if self.interrupt_event.is_set(): break match = re.match(r"(\w+)\((.*)\)", action) action_name, args_str = match.groups() args = [arg.strip() for arg in args_str.split(",")] if args_str else [] if not hasattr(self, action_name): self.get_logger().error(Fore.RED+f"action_service: {action} is invalid action, skip execution"+Fore.RESET) else: method = getattr(self, action_name) feedback_result = method(*args) if not self.interrupt_event.is_set():#向dify-agent反馈动作执行结果 msg=LlmRequest() if feedback_result==False: #动作执行失败 msg.llm_request=self.actionlog.get_text("action_feedback_2",action_name=actions) msg.robot_feedback=True self.llm_request_pub.publish(msg) elif feedback_result==True: #动作执行成功 msg.llm_request=self.actionlog.get_text("action_feedback_1",action_name=actions) msg.robot_feedback=True self.llm_request_pub.publish(msg) elif feedback_result==None: #空操作不反馈 if self.debug_mode: self.get_logger().info(self.actionlog.get_text("system_log_1")) if self.debug_mode: self.get_logger().info(msg.llm_request) if self.debug_mode: self.get_logger().info(self.actionlog.get_text("system_log_2")) self.action_runing = False self.interrupt_event.clear() goal_handle.succeed() result = Rot.Result() result.success = True return result ``` ## 6.打断功能 机器人支持任意阶段的打断,具体可以分为录音阶段打断、对话阶段打断、动作阶段打断,这里对每个阶段打断的原理进行介绍 ### 6.1 录音阶段打断 如果录音过程发现说错话,或者对说话内容不满意需要重新录音,在**录音的过程中**可以直接再次唤醒后打断之前的录音直接重新开始说话录制 * 逻辑实现为asr.py文件中`asr_detect_run`方法: * 每次唤醒时,如果已有正在运行的唤醒录音的线程,则通过线程事件 `stop_event` 中断它,并等待其结束; * 清除停止事件标志后,启动新的录音线程 ``` def asr_detect_run(self): while True: # 只处理最近的一次唤醒请求,防止短时间重复唤醒 / Process only the most recent wake-up request to prevent duplicates if self.wakeup_event.wait(timeout=0.1): self.wakeup_event.clear() self.extern_wakeup.set() self.publisher.wakeup_pub.publish(Bool(data=True)) self.logger.info("I'm here🌟") self.wake_up_voice() # 应答用户 / Respond to the user if self.current_thread and self.current_thread.is_alive(): # 打断上次的唤醒处理线程 / Interrupt the previous wake-up handling thread self.stop_event.set() self.current_thread.join() # 等待当前线程结束 / Wait for the current thread to finish self.stop_event.clear() # 清除事件 / Clear the event self.current_thread = threading.Thread(target=self.kws_handler) self.current_thread.daemon = True self.current_thread.start() time.sleep(0.5) ``` ### 6.2 对话阶段打断 如果机器人讲话过程中,觉得机器人回复不满意或者不想机器人继续回复,则可以使用唤醒词直接打断机器人的说话,直接开始说话录音,此时可以对机器人说出新的指令(仍在当前任务周期中),或者对机器人说出:"结束当前任务"可以直接结束掉当前任务开启新的任务周期。 * 逻辑实现为action_service.py文件中的**CustomActionServer**类的**wakeup_callback**和**play_audio**方法: * wakeup_callback是唤醒处理的回调函数,在asr.py程序中每次会唤醒时,会通过话题通信方式发布唤醒信号,wakeup_callback订阅并处理信号。 * 每次唤醒时检测**pygame.mixer**是否正在播放音频,如果是,则通过线程事件self.stop_event通知播放线程停止播放 * 如果唤醒后检测之前的动作正在运行,则置位**self.interrupt_flag**标志位,用于后续的动作打断 ``` def wakeup_callback(self, msg): if msg.data: if pygame.mixer.music.get_busy(): self.stop_event.set() if self.action_runing: self.interrupt_flag = True self.stop() self.pubSix_Arm(self.init_joints) ``` `play_audio`播放音频时会检测`self.asr_detect.extern_wakeup`是否被置位,一旦检测到置位,立刻停止当前播放的音频。 ``` def play_audio(self,file_path: str) -> None: '''播放音频 / Play audio''' self.asr_detect.extern_wakeup.clear() with self.pygame_lock: pygame.mixer.init() pygame.mixer.music.load(file_path) pygame.mixer.music.play() while pygame.mixer.music.get_busy(): if self.asr_detect.extern_wakeup.is_set(): pygame.mixer.music.stop() self.asr_detect.extern_wakeup.clear() break pygame.time.Clock().tick(10) pygame.mixer.quit() ``` ### 6.3 动作阶段打断 机器人在执行动作的过程中如果会被唤醒,会停止当前动作,恢复初始姿态。具体可分为普通动作打断和带有子进程的动作打断 #### 6.3.1普通动作打断 * 机器人底盘运动和机械臂运动是通过发布速度话题和机器臂关节角度话题进行控制的 * \_execute\_action底盘控制函数会不断检测self.interrupt\_event打断标志位,如果被置位,则立即停止底盘运行 * 同理pubSix\_Arm机械臂控制函数检测 self.interrupt\_event打断标志位,只有没有被置位时才会正常发布机械臂关节角度话题。 ``` def _execute_action(self, twist, num=1, durationtime=3.0): for _ in range(num): start_time = time.time() while (time.time() - start_time) < durationtime: if self.interrupt_flag: self.stop() return self.publisher.publish(twist) time.sleep(0.1) def pubSix_Arm(self, joints, id=6, angle=180.0, runtime=2000): arm_joint = ArmJoints() arm_joint.joint1 = joints[0] arm_joint.joint2 = joints[1] arm_joint.joint3 = joints[2] arm_joint.joint4 = joints[3] arm_joint.joint5 = joints[4] arm_joint.joint6 = joints[5] arm_joint.time = runtime if not self.interrupt_flag: self.TargetAngle_pub.publish(arm_joint) ``` #### 6.3.2 带有子进程的动作打断(我们机器人没有机械臂,所以看看就行了) 例如机械臂夹取、分捡机器码等动作需要在子进程中启动外部的程序,这里以机械臂夹取动作函数grasp\_obj举例讲解: 当机械臂夹取未完成时,会一直在while not self.grasp\_obj\_future.done():循环中进行等待:这个过程中如果检测到self.interrupt\_event打断标志位被置位,则会先调用对应的\_\_reset\_grasp\_obj()函数会递归结束子进程树,然后停止动作。 ``` def grasp_obj(self, x1, y1, x2, y2) -> None: """grasp_obj: 夹取物体 x1,y1,x2,y2: 物体外边框坐标 """ def __reset_grasp_obj(): kill_process_tree(self.grasp_obj_process_1.pid) kill_process_tree(self.grasp_obj_process_2.pid) kill_process_tree(self.grasp_obj_process_3.pid) self.grasp_obj_future = Future() cmd_1=['ros2', 'run', 'largemodel_arm', 'grasp_desktop'] cmd_2=['ros2', 'run', 'largemodel_arm', 'KCF_follow'] cmd_3=['ros2', 'run', 'M3Pro_KCF', 'ALM_KCF_Tracker_Node'] self.grasp_obj_process_1=subprocess.Popen(cmd_1) time.sleep(5.0) #等待grasp_desktop启动完成 self.grasp_obj_process_2=subprocess.Popen(cmd_2) self.grasp_obj_process_3=subprocess.Popen(cmd_3) x1 = int(x1) y1 = int(y1) x2 = int(x2) y2 = int(y2) while not self.object_position_pub.get_subscription_count(): time.sleep(0.5) self.object_position_pub.publish(Int16MultiArray(data=[x1, y1, x2, y2])) while not self.grasp_obj_future.done(): if self.interrupt_event.is_set(): __reset_grasp_obj() self.pubSix_Arm(self.init_joints) return None time.sleep(0.1) result = self.grasp_obj_future.result() if not self.interrupt_event.is_set(): if result.data == "grasp_obj_done": res = True else: res = False __reset_grasp_obj() if self.interrupt_event.is_set(): time.sleep(0.5) self.pubSix_Arm(self.init_joints) # 机械臂收回 return res ```
admin
2026年1月4日 13:28
32
转发
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
Word文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码
有效期
AI