自动驾驶
yolo5自动驾驶
1、重要!更换U盘的操作指引
2、关闭开机自启动大程序
3、Linux基础
4、YoloV5训练集
5、自动驾驶基础调试(代码解析)
6、自动驾驶特调
7、自动驾驶原理
8、PID算法理论
9、阿克曼运动学分析理论
10、建立运动学模型
常用命令
首次使用
一、原理分析
麦克纳姆轮运动学分析
二、AI大模型
3、AI大模型类型和原理
4、RAG检索增强和模型训练样本
5、具身智能机器人系统架构
6、具身智能玩法核心源码解读
7、配置AI大模型
三、深度相机
2、颜色标定
10、深度相机的基础使用
11、深度相机伪彩色图像
12、深度相机测距
13、深度相机色块体积测算
14、深度相机颜色跟随
15、深度相机人脸跟随
16、深度相机KCF物体跟随
17、深度相机Mediapipe手势跟随
18、深度相机视觉循迹自动驾驶
19、深度相机边缘检测
四、多模态视觉理解
20、多模态语义理解、指令遵循
21、多模态视觉理解
22、多模态视觉理解+自动追踪
23、多模态视觉理解+视觉跟随
24、多模态视觉理解+视觉巡线
25、多模态视觉理解+深度相机距离问答
26、多模态视觉理解+SLAM导航
27、多模态视觉理解+SLAM导航+视觉巡线
28、意图揣测+多模态视觉理解+SLAM导航+视觉功能
五、雷达
8、雷达基础使用
思岚系列雷达
六、建立地图
9、Gmapping建图
cartographer快速重定位导航
RTAB-Map导航
RTAB-Map建图
slam-toolbox建图
cartographer建图
Navigation2多点导航避障
Navigation2单点导航避障
手机APP建图与导航
七、新机器人自动驾驶与调整
多模态视觉理解+SLAM导航
新机器人自动驾驶
场地摆放及注意事项
启动测试
识别调试
无人驾驶的车道保持
无人驾驶路标检测
无人驾驶红绿灯识别
无人驾驶之定点停车
无人驾驶转向决策
无人驾驶之喇叭鸣笛
无人驾驶减速慢行
无人驾驶限速行驶
无人驾驶自主泊车
无人驾驶综合应用
无人驾驶融合AI大模型应用
八、路网规划
路网规划导航简介
构建位姿地图
路网标注
路网规划结合沙盘地图案例
九、模型训练
十、YOLOV11开发
多机通讯配置
汝城县职业中等专业学校知识库-信息中心朱老师编辑
-
+
首页
二、AI大模型
6、具身智能玩法核心源码解读
6、具身智能玩法核心源码解读
## 具身智能玩法核心源码解读 ### 1.课程内容 1.AI大模型具身智能玩法是一个复合功能,涉及多个节点程序的配合实现,本节课程对核心的程序进行讲解。 ### 2.源码功能包结构 #### 2.1 功能包文件结构 AI大模型具身智能的ros功能包为largemodel,功能包路径为: jetson orin nano 主机: `/home/jetson/yahboomcar_ros2_ws/yahboomcar_ws/src/largemodel` 功能包文件结构如下:  **文件夹和文件作用说明如下**: **2.1.1 config** 配置文件夹,存放配置文件 large_model_interface.yaml 大模型接口配置文件,用于配置各个平台的API-KEY、大模型参数 map_mapping.yaml 地图映射文件,用于建立栅格地图与现实世界中区域的映射关系 yahboom.yaml 节点配置文件:用于配置核心节点的参数 **2.1.2 largemodel** 源码文件夹,AI大模型具身智能玩法核心程序 asr.py 语音识别程序文件,用于运行语音识别模型,与用户进行交互 model_service.py 模型服务器程序文件,用于调用各种模型接口实现模型推理架构 action_service_usb.py usb相机动作服务器程序文件,用于接收模型服务器请求的动作列表,控制机器人进行运动并播放声音 action_service_nuwa.py 深度相机动作服务器程序文件,用于接收模型服务器请求的动作列表,控制机器人进行运动并播放声音 **2.1.3 launch** 2.1.3 launch 启动文件文件夹,用于存放ros2节点启动文件 largemodel_control.launch.py AI大模型具身智能玩法的启动文件:启动多个节点,有语音交互模式、文字交互模式两种启动方式 **2.1.4 resources_file** 存放系统声音的音频文件 **2.1.5 utils** 组件文件夹,存放非核心功能的程序文件 large_model_interface.py 大模型接口程序文件,包含了各个平台调用大模型的底层接口,不同平台不同模型的调用程序不相同,在接口文件中统一管理调用方式 mic_serial.py 语音模块驱动程序文件,用于驱动语音模块的的唤醒词功能 promot_usb.py usb相机版大模型prompt提示词文件:用于生成执行层大模型的提示词的文件 promot_nuwa.py 深度相机版大模型prompt提示词文件:用于生成执行层大模型的提示词的文件 dify_client2.py dify API函数,用于国际版本dify请求本地dify应用 #### 2.2 程序间调用关系图  ### 3.语音识别功能 语音识别功能包含VAD语音活动检测、语音转文字转换两部分,源码路径: ``` /home/jetson/yahboomcar_ros2_ws/yahboomcar_ws/src/largemodel/largemodel/asr.py ``` #### 3.1 VAD语音活动检测 实现程序:ASRNode类中的listen_for_speech方法 程序解读:通过指定麦克风实时录音,使用 VAD(语音活动检测)判断是否正在说话,当检测到一段语音结束后(连续静音超过设定帧数),停止录音并保存有效语音内容到文件。 详细逻辑如下: 1、初始化音频流并配置参数(如采样率、通道数等)。 2、持续读取音频帧并进行语音活动检测。 3、若检测到语音开始,则将音频帧加入缓冲区;若检测到持续静音超过阈值(90帧,约1s),则结束录音。 4、录音结束后,去除尾部静音部分,并将有效语音保存为 WAV 文件。 5、若未检测到有效语音,则不保存文件。 ``` def listen_for_speech(self, mic_index=0): p = pyaudio.PyAudio() audio_buffer = [] silence_counter = 0 MAX_SILENCE_FRAMES = 90 # 30帧*30ms=900ms静音后停止 / Stop after 900ms of silence (30 frames * 30ms) speaking = False # 语音活动标志 / Flag indicating speech activity frame_counter = 0 # 计数器 / Frame counter stream_kwargs = { "format": pyaudio.paInt16, "channels": 1, "rate": self.sample_rate, "input": True, "frames_per_buffer": self.frame_bytes, } if mic_index != 0: stream_kwargs["input_device_index"] = mic_index # 通过蜂鸣器提示用户讲话 / Prompt the user to speak via the buzzer self.pub_beep.publish(UInt16(data=1)) time.sleep(0.5) self.pub_beep.publish(UInt16(data=0)) try: # 打开音频流 / Open audio stream stream = p.open(**stream_kwargs) while True: if self.stop_event.is_set(): return False frame = stream.read( self.frame_bytes, exception_on_overflow=False ) # 读取音频数据 / Read audio data is_speech = self.vad.is_speech( frame, self.sample_rate ) # VAD检测 / VAD detection if is_speech: # 检测到语音活动 / Detected speech activity speaking = True audio_buffer.append(frame) silence_counter = 0 else: if speaking: # 在语音活动后检测静音 / Detect silence after speech activity silence_counter += 1 audio_buffer.append( frame ) # 持续记录缓冲 / Continue recording buffer # 静音持续时间达标时结束录音 / End recording when silence duration meets the threshold if silence_counter >= MAX_SILENCE_FRAMES: break frame_counter += 1 if frame_counter % 2 == 0: self.get_logger().info("1" if is_speech else "-") # print('1-' if is_speech else '0-', end='', flush=True) # 实时状态显示方式 / Real-time status display finally: stream.stop_stream() stream.close() p.terminate() # 保存有效录音(去除尾部静音) / Save valid recording (remove trailing silence) if speaking and len(audio_buffer) > 0: # 裁剪最后静音部分 / Trim the last silent part clean_buffer = ( audio_buffer[:-MAX_SILENCE_FRAMES] if len(audio_buffer) > MAX_SILENCE_FRAMES else audio_buffer ) with wave.open(self.user_speechdir, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(p.get_sample_size(pyaudio.paInt16)) wf.setframerate(self.sample_rate) wf.writeframes(b"".join(clean_buffer)) return True ``` 3.2 ASR语音识别 **实现程序**:ASRNode类中的ASR_conversion方法将录音文件转换成文字 调用large_model_interface.py大模型接口文件中的语音识别模型接口函数 **注意:语音识别出的文字小于4个字符时会被认定为是无效字符,原因是为了防止误唤醒后的内容被当成有效内容** **程序解读:** 若使用oline_asr,则调用对应方法并检查结果是否为有效文本(长度大于4),成功则返回识别内容,否则记录错误并返回'error'。 否则使用SenseVoiceSmall_ASR进行识别,并做相同的结果判断与处理。 其中,SenseVoiceSmall_ASR为本地模型语音识别方法,只有jetson orin nano 、jetson orin NX主机可以使用。 ``` def ASR_conversion(self, input_file): if self.use_oline_asr: result=self.modelinterface.oline_asr(input_file) if result[0] == 'ok' and len(result[1]) > 4: return result[1] else: self.get_logger().error(f'ASR Error:{result[1]}') return 'error' else: result=self.modelinterface.SenseVoiceSmall_ASR(input_file) if result[0] == 'ok' and len(result[1]) > 4: return result[1] else: self.get_logger().error(f'ASR Error:{result[1]}') return 'error' ``` ### 4.模型服务器功能 实现程序为model_service.py,用于接收语音交互模式下的语音识别结果或是文字交互模式下的终端输入,实现大模型推理逻辑,源码路径: ``` /home/jetson/yahboomcar_ros2_ws/yahboomcar_ws/src/largemodel/largemodel/model_service.py ``` 4.1 双模型推理 默认使用双模型推理模式,实现程序为LargeModelService类中的dual_large_model_mode、instruction_process方法 程序解读: 若是新指令周期(self.new_order_cycle 为 True): 初始化多模态历史上下文; 使用任务决策模型对用户输入 prompt 进行任务规划; 将规划结果传给指令执行层模型处理; 设置 new_order_cycle 为 False,表示当前指令周期开始执行。 否则(非新周期): 直接将输入 prompt 和类型传给指令执行层模型处理。 ``` def dual_large_model_mode(self, type, prompt=""): """ 此函数实现了双模型推理模式,即先由文本生成模型进行任务规划,然后由多模态大模型生成动作列表 This function implements the dual model inference mode, where the text generation model first plans the task, and then the multimodal large model generates the action list. """ if ( self.new_order_cycle ): # 判断是否是新任务周期 / Determine if it is a new task cycle # 判断上一轮对话指令是否完成如果完成就清空历史上下文,开启新的上下文 / Determine if the previous round of dialogue instructions are completed. If completed, clear the historical context and start a new context self.model_client.init_Multimodal_history( get_prompt() ) # 初始化执行层上下文历史 / Initialize execution layer context history execute_instructions = self.model_client.TaskDecision( prompt ) # 调用决策层大模型进行任务规划 / Call the decision layer large model for task planning if not execute_instructions == "error": if self.text_chat_mode: msg = String( data=f"The upcoming task to be carried out:{execute_instructions}" ) self.text_pub.publish(msg) else: self.get_logger().info( f"The upcoming task to be carried out:{execute_instructions}" ) # 即将执行的任务:... self.instruction_process( type="text", prompt=f"用户:{prompt},决策层AI规划:{execute_instructions}", ) # 传递决策层模型规划好的执行步骤给执行层模型 / Pass the planned execution steps from the decision layer model to the execution layer model self.new_order_cycle = ( False # 重置指令周期标志位 / Reset the instruction cycle flag ) else: self.get_logger().info( "The model service is abnormal. Check the large model account or configuration options" ) # 模型推理失败,请检查模型配额和账户是否正常!!! else: self.instruction_process( prompt, type ) # 调用执行层大模型生成成动作列表并执行 / Call the execution layer large model to generate an action list and execute def instruction_process(self, prompt, type, conversation_id=None): """ 根据输入信息的类型(文字/图片),构建不同的请求体进行推理,并返回结果) Based on the type of input information (text/image), construct different request bodies for inference and return the result. """ if self.regional_setting == "China": # 国内版 if type == "text": raw_content = self.model_client.multimodalinfer(prompt) elif type == "image": self.save_single_image() raw_content = self.model_client.multimodalinfer( "机器人反馈:执行seewhat()完成", image_path=self.image_save_path ) json_str = self.extract_json_content(raw_content) elif self.regional_setting == "international": # 国际版 if type == "text": result = self.model_client.TaskExecution( input=prompt, map_mapping=self.map_mapping, language=self.language_dict[self.language], conversation_id=conversation_id, ) if result[0]: json_str = self.extract_json_content(result[1]) self.conversation_id = result[2] else: self.get_logger().info(f"ERROR:{result[1]}") elif type == "image": self.save_single_image() result = self.model_client.TaskExecution( input="机器人反馈:执行seewhat()完成", map_mapping=self.map_mapping, language=self.language_dict[self.language], image_path=self.image_save_path, conversation_id=conversation_id, ) if result[0]: json_str = self.extract_json_content(result[1]) self.conversation_id = result[2] else: self.get_logger().info(f"ERROR:{result[1]}") if json_str is not None: # 解析JSON字符串,分离"action"、"response"字段 / Parse JSON string, separate "action" and "response" fields action_plan_json = json.loads(json_str) action_list = action_plan_json.get("action", []) llm_response = action_plan_json.get("response", "") else: self.get_logger().info( f"LargeScaleModel return: {json_str},The format was unexpected. The output format of the AI model at the execution layer did not meet the requirements" ) return if self.text_chat_mode: msg = String(data=f'"action": {action_list}, "response": {llm_response}') self.text_pub.publish(msg) else: self.get_logger().info( f'"action": {action_list}, "response": {llm_response}' ) self.send_action_service( action_list, llm_response ) # 异步发送动作列表、回复内容给ActionServer / Asynchronously send action list and response content to ActionServer ``` ### 5.动作服务器功能 实现程序为action_service.py,用于接收模型服务器请求的动作列表,解析动作列表并执行,源码路径: ``` #深度相机 /home/jetson/yahboomcar_ros2_ws/yahboomcar_ws/src/largemodel/largemodel/action_service_nuwa.py ``` **5.1 动作函数库** 机器人的动作函数来自CustomActionServer类中方法,包含了各种子动作的函数,这里以控制机器人底盘移动的动作函数为例进行讲解,其他动作函数会在后续章节第一次出现时进行讲解 程序解读: 1、将输入的字符串参数转换为浮点数 2、创建并设置Twist类型的运动指令 3、发布对应速度话题,结束后停止 4、根据模式决定是否发布完成信息 ``` def set_cmdvel(self,linear_x,linear_y, angular_z,duration):#发布cmd_vel # 将参数从字符串转换为浮点数 linear_x = float(linear_x) linear_y = float(linear_y) angular_z = float(angular_z) duration = float(duration) twist = Twist() twist.linear.x = linear_x twist.linear.y = linear_y twist.angular.z = angular_z self._execute_action(twist,durationtime=duration) self.stop() if self.combination_mode:#是否为组合模式,组合模型需要执行完 return else: self.action_status_pub(f'机器人反馈:执行set_cmdvel({linear_x},{linear_y},{angular_z},{duration})完成') ``` #### 5.2 解析动作列表控制机器人功能 将大模型生成的动作列表解析为控制机器人实体的函数,并执行,实现程序为CustomActionServer类中的execute_callback方法 程序解读: 1、接收客户端发送的动作列表(字符串格式); 2、若动作列表为空,直接返回成功结果; 3、若只有一个动作,解析并执行对应方法,失败则中止; 4、若有多个动作,进入组合模式依次执行,过程中记录日志与反馈; 5、所有动作执行完毕后调用 stop() 停止机器人; 6、最终返回执行成功的结果给客户端。 ``` def execute_callback(self, goal_handle): feedback_msg = Rot.Feedback() stop = getattr(self, "stop") # 获取停止方法,通过发布话题的方法停止机器人运动 actions=goal_handle.request.actions if not actions: # 如果动作列表为空,直接返回成功 if not self.text_chat_mode: self.send_Audiorequest(goal_handle.request.llm_response) if self.use_double_llm: self.action_status_pub('机器人反馈:回复用户完成') goal_handle.succeed() result = Rot.Result() result.success = True return result elif len(actions) == 1:# 动作列表只有一个动作,正常执行流程 # 情况2:列表只有一个元素 action = actions[0] if not self.text_chat_mode: self.send_Audiorequest(goal_handle.request.llm_response) match = re.match(r"(\w+)\((.*)\)", action) if not match: self.get_logger().warning(f'action_service: {action} is invalid action,skip execution') goal_handle.abort() result = Rot.Result() result.success = False return result else: action_name, args_str = match.groups() args = [arg.strip() for arg in args_str.split(",")] if args_str else [] try: if hasattr(self, action_name): method = getattr(self, action_name) method(*args) feedback_msg.status = f'action service execute {action} successed' # self.get_logger().info(feedback_msg.status) # 添加日志 goal_handle.publish_feedback(feedback_msg) else: self.get_logger().warning(f'action_service:inviald: {action_name},skip execution') self.action_status_pub('机器人反馈:动作函数不存在,无法执行') goal_handle.abort() result = Rot.Result() result.success = False return result except Exception as e: self.get_logger().error(f'action_service:execute action: {action} failed,error-log: {str(e)}') goal_handle.abort() result = Rot.Result() result.success = False return result else:#如果动作列表有多个动作,使能组合模式 if not self.text_chat_mode: self.send_Audiorequest(goal_handle.request.llm_response) self.combination_mode=True for action in actions: # 使用正则表达式解析动作名称和参数 match = re.match(r"(\w+)\((.*)\)", action) if not match: self.get_logger().warning(f'action_service: {action} is invalid action,skip execution') continue action_name, args_str = match.groups() args = [arg.strip() for arg in args_str.split(",")] if args_str else [] try: # 检查动作是否是 CustomActionServer 类中的方法 if hasattr(self, action_name): method = getattr(self, action_name) # 动态调用方法并传递参数 method(*args) # method(*converted_args) feedback_msg.status = f'action service execute {action} successed' self.get_logger().info(feedback_msg.status) # 添加日志 goal_handle.publish_feedback(feedback_msg) else: self.get_logger().warning(f'action_service:inviald: {action_name},skip execution') except Exception as e: self.get_logger().error(f'action_service:execute action: {action} failed,error-log: {str(e)}') goal_handle.abort() result = Rot.Result() result.success = False return result self.action_status_pub(f'机器人反馈:执行{actions}完成') self.combination_mode=False#重置组合模式标志位 stop() # 执行完全部动作停止机器人 # 返回成功信息给客户端 goal_handle.succeed() result = Rot.Result() result.success = True return result ``` ### 6.打断功能 机器人支持任意阶段的打断,具体可以分为录音阶段打断、对话阶段打断、动作阶段打断,这里对每个阶段打断的原理进行介绍 #### 6.1 录音阶段打断 如果录音过程发现说错话,或者对说话内容不满意需要重新录音,在**录音的过程中**可以直接再次唤醒后打断之前的录音直接重新开始说话录制 逻辑实现为asr.py文件中的**ASRNode**类的**main_loop**方法: 每次唤醒时,如果已有正在运行的唤醒录音的线程,则通过线程事件 stop_event 中断它,并等待其结束; 清除停止事件标志后,启动新的录音线程 ``` def main_loop(self): while rclpy.ok(): while ( self.audio_request_queue.qsize() > 1 ): # 只处理最近的一次唤醒请求,防止重复唤醒 / Process only the most recent wake-up request to prevent duplicates self.audio_request_queue.get() if not self.audio_request_queue.empty(): self.audio_request_queue.get() self.wakeup_pub.publish( Bool(data=True) ) # 发布唤醒信号 / Publish wake-up signal self.get_logger().info("I'm here") playsound( self.audio_dict[self.first_response] ) # 应答用户 / Respond to the user if ( self.current_thread and self.current_thread.is_alive() ): # 打断上次的唤醒处理线程 / Interrupt the previous wake-up handling thread self.stop_event.set() self.current_thread.join() # 等待当前线程结束 / Wait for the current thread to finish self.stop_event.clear() # 清除事件 / Clear the event self.current_thread = threading.Thread(target=self.kws_handler) self.current_thread.daemon = True self.current_thread.start() rclpy.spin_once(self, timeout_sec=0.1) time.sleep(0.1) ``` #### 6.2 对话阶段打断 如果机器人讲话过程中,觉得机器人回复不满意或者不想机器人继续回复,则可以使用唤醒词直接打断机器人的说话,直接开始说话录音,此时可以对机器人说出新的指令(仍在当前任务周期中),或者对机器人说出:"结束当前任务"可以直接结束掉当前任务开启新的任务周期。 逻辑实现为action_service.py文件中的**CustomActionServer**类的**wakeup_callback**和**play_audio**方法: wakeup_callback是唤醒处理的回调函数,在asr.py程序中每次会唤醒时,会通过话题通信方式发布唤醒信号,wakeup_callback订阅并处理信号。 每次唤醒时检测**pygame.mixer**是否正在播放音频,如果是,则通过线程事件self.stop_event通知播放线程停止播放 如果唤醒后检测之前的动作正在运行,则置位**self.interrupt_flag**标志位,用于后续的动作打断 ``` def wakeup_callback(self, msg): if msg.data: if pygame.mixer.music.get_busy(): self.stop_event.set() if self.action_runing: self.interrupt_flag = True self.stop() self.pubSix_Arm(self.init_joints) ``` play_audio播放音频时会检测self.stop_event是否被置位,一旦检测到置位,立刻停止当前播放的音频。 ``` def play_audio(self, file_path: str, feedback: Bool = False) -> None: """ 同步方式播放音频函数The function for playing audio in synchronous mode """ pygame.mixer.music.load(file_path) pygame.mixer.music.play() while pygame.mixer.music.get_busy(): if self.stop_event.is_set(): pygame.mixer.music.stop() self.stop_event.clear() # 清除事件 return pygame.time.Clock().tick(10) if feedback: self.action_status_pub("机器人反馈:回复用户完成") ``` #### 6.3 动作阶段打断 机器人在执行动作的过程中如果会被唤醒,会停止当前动作,恢复初始姿态。具体可分为普通动作打断和带有子进程的动作打断 #### 6.3.1普通动作打断 实现程序为action_service.py中CustomActionServer类中_execute_action和pubSix_Arm方法 1、机器人底盘运动通过发布速度话题和进行控制的 2、_execute_action底盘控制函数会不断检测self.interrupt_flag打断标志位,如果被置位,则立即停止底盘运行 ``` def _execute_action(self, twist, num=1, durationtime=3.0): for _ in range(num): start_time = time.time() while (time.time() - start_time) < durationtime: if self.interrupt_flag: self.stop() return self.publisher.publish(twist) time.sleep(0.1) ``` #### 6.3.2 带有子进程的动作打断 例如跟随机器码等动作需要在子进程中启动外部的程序,这里以机械臂夹取动作函数grasp_obj举例讲解: 当跟随未完成时,会一直在self.apriltagFollow_future.done():循环中进行等待:这个过程中如果检测到self.interrupt_flag打断标志位被置位,则会先调用self.kill_process_tree函数会递归结束子进程树,然后停止动作。 ``` def apriltagFollow(self): self.apriltagFollow_future = Future() #复位Future对象 process_1 = subprocess.Popen(['ros2', 'run', 'yahboomcar_voice_ctrl', 'voice_apriltagFollow']) while not self.apriltagFollow_future.done(): if self.interrupt_flag: break time.sleep(0.1) self.kill_process_tree(process_1.pid) self.cancel() ```
admin
2025年11月29日 13:18
10
转发
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
Word文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码
有效期
AI