> 技术文档 > 小智AI MCP视觉交互2.0(超低延迟低至1s,通过外部MCP实现,与固件做解耦,小智帮我找钥匙)_小智mcp 在线音乐

小智AI MCP视觉交互2.0(超低延迟低至1s,通过外部MCP实现,与固件做解耦,小智帮我找钥匙)_小智mcp 在线音乐

初次体验小智AI MCP视觉交互功能,跟着博主的教程进行复刻,一些过程错误记录:

目前已经在完美运行中...

AI小智MCP摄像头

小智AI MCP外置视觉系统重磅升级2.0所有设备0成本0改造接入摄像头视觉系统硬件平权,代码开源!人形机器人?语音小盒子?通通给我接入AI小智MCP服务!_哔哩哔哩_bilibili

该方案使用外置摄像头实现,比较理想的场景是智能家庭监控,比如家里有五个摄像头,可以让小智帮忙找宠物,“你好小智,我家帮我找下猫。”

博主原教程:

demo环境:

mac系统

Python 3.12.10

登陆阿里云官网,https://bailian.console.aliyun.com/?tab=model#/api-key获取自己的api-key(有大量免费额度可用)

注意:pip install -r requirement.txt后如果还出现 no modole name ‘xxx’的报错,请使用pip install xxx一个个安装完成。

export MCP_ENDPOINT=\"你的MCP接入点地址\"export DASHSCOPE_API_KEY=\"填你的api-key\"pip install -r requirement.txtpython mcp_pipe,py myVL.py

引用博主的开源教程

注意:

1. 博主使用的mac系统,export是将这两个参数设置为系统环境变量;

2. api-key的背后是阿里云的视觉大模型,它是能进行视觉类场景交互的核心大脑;

一、源代码测试

1.1 环境准备工作

搞环境又搞了很久,这里就不展开了,细致分析都能找到问题

1.2 mcp服务连接问题

环境搞好后,连接mcp服务又卡了很久。核心问题点在与:

#源码async def connect_to_server(uri): \"\"\"Connect to WebSocket server and establish bidirectional communication with `mcp_script`\"\"\" global reconnect_attempt, backoff try: logger.info(f\"Connecting to WebSocket server...\") async with websockets.connect(uri) as websocket: logger.info(f\"Successfully connected to WebSocket server\") # Reset reconnection counter if connection closes normally reconnect_attempt = 0 backoff = INITIAL_BACKOFF # Start mcp_script process process = subprocess.Popen( [\'python\', mcp_script], #\'python\'这里是问题的关键,在我的环境下必须等知名python.exe的绝对路径,否则就是会报错 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True # Use text mode ) logger.info(f\"Started {mcp_script} process\") # Create two tasks: read from WebSocket and write to process, read from process and write to WebSocket await asyncio.gather( pipe_websocket_to_process(websocket, process), pipe_process_to_websocket(process, websocket), pipe_process_stderr_to_terminal(process) ) except websockets.exceptions.ConnectionClosed as e: logger.error(f\"WebSocket connection closed: {e}\") raise # Re-throw exception to trigger reconnection except Exception as e: logger.error(f\"Connection error: {e}\") raise # Re-throw exception finally: # Ensure the child process is properly terminated if \'process\' in locals(): logger.info(f\"Terminating {mcp_script} process\") try: process.terminate() process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() logger.info(f\"{mcp_script} process terminated\")

修改段如下:

async def connect_to_server(uri): \"\"\"Connect to WebSocket server and establish bidirectional communication with `mcp_script`\"\"\" global reconnect_attempt, backoff try: logger.info(f\"Connecting to WebSocket server...\") async with websockets.connect(uri) as websocket:------------------------------------------------------------------------------- # Start mcp_script process #所运行的python路径 venv_python = r\"D:\\github\\TenenglaTech-VL-MCP\\TenenglaTech-VL-MCP\\venv\\Scripts\\python.exe\" process = subprocess.Popen( [venv_python, mcp_script], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, # Use text mode bufsize=1, encoding=\'utf-8\', errors=\'replace\', env={**os.environ, \'PYTHONIOENCODING\': \'utf-8\'} ) logger.info(f\"Started {mcp_script} process\")

修改好后成功运行,且可以调用mcp工具。 不过小智AI调用工具时,一直会超时。

1.3 调用超时问题

原有的逻辑是,当小智收到语音指令后,才去通过MCP服务打开摄像头,而摄像机每次打开就要很久,所以在前端上就是用户一直等待;我们需要修改逻辑,让摄像头提前预热打开,并保持常开。当小智收到指令后,截取当前帧作用图像输入,这样整条逻辑就通顺了。修改的代码如下:

# 摄像头管理器 - 保持摄像头常开class CameraManager: _instance = None _lock = threading.Lock() def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._cap = None cls._instance._initialized = False return cls._instance def initialize(self): \"\"\"初始化摄像头(线程安全)\"\"\" if self._initialized: return  try: if platform.system() == \'Darwin\': os.environ[\'OBJC_DISABLE_INITIALIZE_FORK_SAFETY\'] = \'YES\' self._cap = cv2.VideoCapture(0) if not self._cap.isOpened(): logger.error(\"摄像头访问被拒绝,请检查系统权限\") return # 优化摄像头参数 self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) self._cap.set(cv2.CAP_PROP_FPS, 30) self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self._initialized = True logger.info(\"摄像头初始化完成\") except Exception as e: logger.error(f\"摄像头初始化失败: {str(e)}\") def get_frame(self) -> Optional[bytes]: \"\"\"获取当前帧\"\"\" if not self._initialized or self._cap is None or not self._cap.isOpened(): return None try: # 丢弃缓冲区中的旧帧 for _ in range(2): self._cap.grab() ret, frame = self._cap.read() if not ret: return None # 高效压缩 frame = cv2.resize(frame, (320, 240)) _, buffer = cv2.imencode(\'.jpg\', frame, [ cv2.IMWRITE_JPEG_QUALITY, 50 ]) return buffer except Exception as e: logger.error(f\"获取帧异常: {str(e)}\") return None def release(self): \"\"\"释放摄像头资源\"\"\" if self._cap and self._cap.isOpened(): self._cap.release() logger.info(\"摄像头资源已释放\") self._initialized = False

二、增加摄像头预览功能

原始代码中可以让小智调用本地摄像头,但是我们作为用户自己却看不到摄像头里有什么,后续在使用时一旦小智反馈错误,我们将无法判断是小智的问题还是图像的问题。让使用者能实时看到摄像头里有什么东西,十分必要。

class CameraManager: def __new__(cls): with cls._lock: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._cap = None cls._instance._initialized = False cls._instance._preview_thread = None cls._instance._preview_active = False #增加预览 cls._instance._preview_lock = threading.Lock() cls._instance._preview_window_name = \"Camera Preview\" return cls._instance #新增启动、预览循环和结束三个函数 def start_preview(self): \"\"\"启动预览窗口\"\"\" with self._preview_lock: if self._preview_active: logger.info(\"预览窗口已打开\") return self._preview_active = True self._preview_thread = threading.Thread( target=self._preview_loop, name=\"PreviewThread\", daemon=True ) self._preview_thread.start() logger.info(\"启动摄像头预览\") def _preview_loop(self): \"\"\"预览窗口主循环\"\"\" if platform.system() == \'Windows\': display_name = self._preview_window_name.encode(\'gbk\').decode(\'latin-1\') else: display_name = self._preview_window_name  try: cv2.namedWindow(display_name, cv2.WINDOW_NORMAL) cv2.resizeWindow(display_name, 640, 480) while self._preview_active: if not self._initialized or self._cap is None or not self._cap.isOpened():  time.sleep(0.1)  continue  # 获取最新帧 for _ in range(2): # 清空缓冲区  self._cap.grab()  ret, frame = self._cap.read() if not ret:  time.sleep(0.1)  continue # 显示帧 cv2.imshow(display_name, frame) # 检查ESC键或窗口关闭 if cv2.waitKey(1) == 27 or cv2.getWindowProperty(display_name, cv2.WND_PROP_VISIBLE) < 1:  self.stop_preview()  break except Exception as e: logger.error(f\"预览错误: {str(e)}\") finally: try: cv2.destroyWindow(display_name) except: pass logger.info(\"预览窗口已关闭\") def stop_preview(self): \"\"\"停止预览\"\"\" with self._preview_lock: if not self._preview_active: return self._preview_active = False if self._preview_thread and self._preview_thread.is_alive(): self._preview_thread.join(timeout=1.0) logger.info(\"预览已停止\") 

三、 添加监控画面

我买了一个萤石CP1智能云台摄像机(家用监控),原始代码里,摄像头是通过cv2.VideoCapture(0)来访问的。这里的参数0表示默认摄像头(通常是笔记本电脑内置摄像头)。如果要使用其他摄像头(如USB外接摄像头或网络监控摄像头),需要调整这个参数或使用摄像头的RTSP流地址。

class CameraManager: #修改initialize函数,增加RTSP协议访问 def initialize(self): if self._initialized: return  try: if platform.system() == \'Darwin\': os.environ[\'OBJC_DISABLE_INITIALIZE_FORK_SAFETY\'] = \'YES\' # 尝试顺序:RTSP流 > 本地摄像头 sources = [] # 1. 优先尝试RTSP流 rtsp_url = os.getenv(\"CAMERA_RTSP_URL\") if rtsp_url: sources.append((\"RTSP流\", rtsp_url)) # 2. 添加本地摄像头作为备选 sources.append((\"本地摄像头\", 0)) # 按顺序尝试所有来源 for source_name, source in sources: try:  logger.info(f\"尝试连接: {source_name}\")  self._cap = cv2.VideoCapture(source)  # 设置超时参数(仅对部分后端有效)  self._cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) # 5秒超时  # 测试是否能读取帧  if self._cap.isOpened(): # 快速测试读取一帧 for _ in range(5): # 清空缓冲区 self._cap.grab() ret, _ = self._cap.read() if ret: logger.info(f\"{source_name}连接成功\") break except Exception as e:  logger.warning(f\"{source_name}连接异常: {str(e)}\") if not self._cap or not self._cap.isOpened(): logger.error(\"所有摄像头来源均失败\") return # 设置摄像头参数 self._cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) self._cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) self._cap.set(cv2.CAP_PROP_BUFFERSIZE, 1) self._initialized = True logger.info(\"摄像头初始化完成\") # 自动启动预览 self.start_preview() except Exception as e: logger.error(f\"摄像头初始化失败: {str(e)}\")

注意:RTSP流只在局域网内可用,如果当前运行代码的设备在外部网络,则需要利用API获取实时视频流。考虑到远程监控互动的必要性,这部分的修改放在下一篇文章里展开。