> 技术文档 > Magentic-ui项目相关整理_magentic-ui没有这个命令

Magentic-ui项目相关整理_magentic-ui没有这个命令


协作相关

src/magentic_ui/├── teams/│ ├── orchestrator/│ │ ├── _group_chat.py # GroupChat 类定义,管理参与者的状态│ │ ├── _orchestrator.py # Orchestrator 类定义,维护对话的状态│ │ └── orchestrator_config.py # 相关的配置 │ └── roundrobin_orchestrator.py # 轮询状态的管理├── agents/│ ├── file_surfer/│ │ └── _file_surfer.py│ ├── web_surfer/│ │ └── _web_surfer.py│ └── users/│ ├── _dummy_user_proxy.py│ └── _metadata_user_proxy.py├── backend/│ ├── teammanager/│ └── _teammanager.py└── task_team.py

核心文件的核心函数

  • _group_chat.py:

    • __init__:智能体团队的创建

  • _orchestrator.py:

    • __init__:协调器的初始化

    • _orchestrate_step_planning:进行规划

    • _orchestrate_step_execution:分配合适的智能体进行执行

    • handle_agent_response:处理智能体的响应(例如:某个Agent:我已完成任务,xxx)

    • handle_start:将所有的消息转发给智能体,消息流转的入口点

  • _file_surfer.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _web_surfer.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

    • _handle_action:处理具体的网页操作,返回操作结果消息

    • _handle_tool_call:处理工具调用,返回工具调用结果消息

  • _dummy_user_proxy.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _metadata_user_proxy.py:

    • on_messages:消息的转化

    • on_messages_stream:自己的智能体消息的响应

  • _teammanager.py:

    • run_stream:团队的运行相关,顶级

  • task_team.py:

    • get_task_team:创建智能体的实例

关系的一些区分处理:

handle_start 和 _orchestrate_step_execution之间的关系:

  1. 消息的流转过程

    1. handle_start 是入口点,负责接收原始消息并转发给所有智能体.

    2. 消息添加到message_histroy中保存

    3. 调用_orchestrate_step 开始协调过程,是进行计划还是分发给智能体执行。

    4. _orchestrate_step_execution 处理的是经过的具体消息,也就是planning后的消息

  2. 执行流程

    1. handle_start 启动整个对话流程

    2. _orchestrate_step 根据当前状态决定是进入规划模式还是执行模式

    3. orchestrate_step_execution 负责具体的执行阶段

  3. 通过self._state进行信息的共享

智能体之间的协作:

初始化阶段

  1. 智能体的创建(src\\magentic_ui\\task_team.py)下的get_task_team

    async def get_task_team(...): # 1. 创建各个智能体实例 file_surfer = FileSurfer( name=\"file_surfer\", model_client=model_client_file_surfer, work_dir=paths.internal_run_dir, bind_dir=paths.external_run_dir, model_context_token_limit=magentic_ui_config.model_context_token_limit, approval_guard=approval_guard, ) # 2.创建并注册智能体 team = GroupChat( participants=[web_surfer, user_proxy, coder_agent, file_surfer], orchestrator_config=orchestrator_config, model_client=model_client_orch, memory_provider=memory_provider, ) # 3. 初始化团队 await team.lazy_init() return team
  2. 团队的创建(src\\magentic_ui\\teams\\orchestrator\\_group_chat.py)下的GroupChat,需要有参与的智能体,模型的客户端,协调器

    class GroupChat(BaseGroupChat, Component[GroupChatConfig]): def __init__( self, participants: List[ChatAgent], model_client: ChatCompletionClient, orchestrator_config: OrchestratorConfig, ... ): # 1. 调用父类初始化 super().__init__( participants, group_chat_manager_name=\"Orchestrator\", group_chat_manager_class=Orchestrator, ... ) # 2. 初始化内部变量 self._orchestrator_config = orchestrator_config self._model_client = model_client self._message_factory = MessageFactory() self._memory_provider = memory_provider
  3.  协调器的初始化(src\\magentic_ui\\teams\\orchestrator\\_orchestrator.py)下的Orchestrato
    class Orchestrator(BaseGroupChatManager): def __init__( self, name: str, group_topic_type: str, output_topic_type: str, participant_topic_types: List[str], participant_names: List[str], participant_descriptions: List[str], ... ): # 初始化基础属性 self._model_client = model_client self._config = config self._user_agent_topic = \"user_proxy\" self._web_agent_topic = \"web_surfer\"

消息相关

团队相关

teammanager.py VS task_team.py VS _orchestrator.py

  1. teammanager.py是管理团队的生命周期

  2. task_team.py只是团队的创建,不管生命周期

  3. _orchestrator.py仅是团队的内部协调,进行通信

用户相关

用户代理是如何加入到团队中?

# src\\magentic_ui\\task_team.py# 用户代理的创建user_proxy: DummyUserProxy | MetadataUserProxy | UserProxyAgent if magentic_ui_config.user_proxy_type == \"dummy\": user_proxy = DummyUserProxy(name=\"user_proxy\") elif magentic_ui_config.user_proxy_type == \"metadata\": assert ( magentic_ui_config.task is not None ), \"Task must be provided for metadata user proxy\" assert ( magentic_ui_config.hints is not None ), \"Hints must be provided for metadata user proxy\" assert ( magentic_ui_config.answer is not None ), \"Answer must be provided for metadata user proxy\" user_proxy = MetadataUserProxy( name=\"user_proxy\", description=\"Metadata User Proxy Agent\", task=magentic_ui_config.task, helpful_task_hints=magentic_ui_config.hints, task_answer=magentic_ui_config.answer, model_client=model_client_orch, ) else: user_proxy_input_func = make_agentchat_input_func(input_func) user_proxy = UserProxyAgent( description=USER_PROXY_DESCRIPTION, name=\"user_proxy\", input_func=user_proxy_input_func, )# 轮询代理 if websurfer_loop_team: # simplified team of only the web surfer team = RoundRobinGroupChat( participants=[web_surfer, user_proxy], max_turns=10000, ) await team.lazy_init() return team # 由LLM智能选择 team = GroupChat( participants=[web_surfer, user_proxy, coder_agent, file_surfer], orchestrator_config=orchestrator_config, model_client=model_client_orch, memory_provider=memory_provider, ) await team.lazy_init() return team

_metadata_user_proxy.py为例,它是创建复杂的用户代理,另一个是创建简单的用户代理

async def on_messages_stream( self, messages: Sequence[BaseChatMessage], cancellation_token: CancellationToken) -> AsyncGenerator[BaseAgentEvent | BaseChatMessage | Response, None]: \"\"\" 处理输入消息并生成响应流 这是核心方法,处理所有消息逻辑 \"\"\" # 1. 严格模式下的提示重写 if ( self.how_helpful == \"strict\" and self.helpful_task_hints and self.helpful_task_hints != \"Helpful hints are not available for this task.\" and self.rewritten_helpful_task_hints is None ): self.rewritten_helpful_task_hints = await self._rewrite_helpful_hints( cancellation_token ) # 2. 消息处理 chat_messages = thread_to_context( list(messages), agent_name=self.name, is_multimodal=self._model_client.model_info[\"vision\"], ) self._chat_history.extend(chat_messages) # 3. 阶段判断 if ( \"type\" in messages[-1].metadata and messages[-1].metadata[\"type\"] == \"plan_message\" ): self.have_encountered_plan_message = True self.in_planning_phase = True else: if self.have_encountered_plan_message: self.in_planning_phase = False else: self.in_planning_phase = True # 4. 模型上下文准备 await self._model_context.clear() system_message = SystemMessage(content=self._get_system_message()) await self._model_context.add_message(system_message) # 5. 添加聊天历史 for msg in self._chat_history: await self._model_context.add_message(msg) # 6. 获取token限制的历史记录 token_limited_history = await self._model_context.get_messages() # 7. 规划阶段处理 if self.in_planning_phase: if self.simulated_user_type in [\"co-planning\", \"co-planning-and-execution\"]: if ( self.max_co_planning_rounds is None or self.current_co_planning_round < self.max_co_planning_rounds ): # 生成规划响应 result = await self._model_client.create(  messages=token_limited_history,  cancellation_token=cancellation_token, ) yield Response(  chat_message=TextMessage( content=result.content, source=self.name, metadata={ \"co_planning_round\": str(self.current_co_planning_round), \"user_plan_reply\": \"llm\", \"helpful_task_hints\": self.rewritten_helpful_task_hints if self.rewritten_helpful_task_hints else self.helpful_task_hints, },  ),  inner_messages=[], ) self.current_co_planning_round += 1 else: # 达到最大规划轮次 yield Response(  chat_message=TextMessage( content=\"accept\", source=self.name, metadata={ \"co_planning_round\": str(self.current_co_planning_round), \"user_plan_reply\": \"accept\", },  ),  inner_messages=[], ) else: # 非协作规划模式 yield Response( chat_message=TextMessage(  content=\"accept\",  source=self.name,  metadata={ \"co_planning_round\": str(self.current_co_planning_round), \"user_plan_reply\": \"accept\",  }, ), inner_messages=[], ) # 8. 执行阶段处理 else: if self.simulated_user_type in [\"co-execution\", \"co-planning-and-execution\"]: if ( self.max_co_execution_rounds is None or self.current_co_execution_round < self.max_co_execution_rounds ): # 生成执行响应 result = await self._model_client.create(  messages=token_limited_history,  cancellation_token=cancellation_token, ) yield Response(  chat_message=TextMessage( content=result.content, source=self.name, metadata={\"user_execution_reply\": \"llm\"},  ),  inner_messages=[], ) self.current_co_execution_round += 1 else: # 达到最大执行轮次 yield Response(  chat_message=TextMessage( content=\"I don\'t know, you figure it out, don\'t ask me again.\", source=self.name,  ), ) else: # 非协作执行模式 yield Response( chat_message=TextMessage(  content=\"I don\'t know, you figure it out, don\'t ask me again.\",  source=self.name,  metadata={\"user_execution_reply\": \"idk\"}, ), )

感觉像用户代理这块,不需要进行LLM的调用,我认为的消息流程是:用户输入-->用户代理-->Orchestrator-->智能体-->LLM-->(智能体-->LLM)可能多个-->Orchesatrator-->用户代理-->用户界面的显示。由于在智能体那块以及调用了LLM,有了确切的解决,我认为正确是:

  • 用户代理只负责消息的传递和显示

  • Orchestrator负责任务的规划和分配(交给智能体)

  • 智能体负责调用LLM和处理任务

  • 响应通过用户代理直接显示给用户

原始消息从哪里来的,怎么传到handle_start?

 @rpc async def handle_start( self, message: GroupChatStart, ctx: MessageContext ) -> None:# @rpc 是远程过程调用,运行函数被远程调用,就像调用本地函数一样# message 来自于外部的调用.venv\\Lib\\site-packages\\autogen_agentchat\\teams\\_group_chat\\_base_group_chat.py 文件中的 run_stream 方法,就是负责将初始值(初始消息)通过 GroupChatStart 消息传递给群聊管理器(GroupChatManager)的地方。具体流程如下:\'\'\'对第一点的self.team的解释,实际上self.team就是代指GroupChat路径:src\\magentic_ui\\teams\\orchestrator\\_group_chat.py下的run_stream谁去调用?不会直接 new 一个 GroupChat 然后直接 run_stream,而是通过 TeamManager 统一管理GroupChat.run_stream 的实际调用,主要是通过 TeamManager 的 self.team.run_stream 实现的,这就是 “间接调用”(本文件的)14,15,16行在src\\magentic_ui\\backend\\teammanager\\teammanager.py下的run_stream 调用的313到315行代码中 async for message in self.team.run_stream( task=task, cancellation_token=cancellation_token ):其中的self.team.run_stream也就代指了GroupChat.run_streamsrc\\magentic_ui\\backend\\teammanager\\teammanager.py 的 run_stream 由src\\magentic_ui\\backend\\web\\managers\\connection.py 下start_stream方法的run_stream调用\'\'\'1.调用 self.team.run_stream(task=...),其中 task 可以是字符串、消息对象、消息列表,或者为 None2.run_stream 方法会把 task 转换成 messages,然后封装到 GroupChatStart(messages=messages) 这个消息对象里。(路径:.venv\\Lib\\site-packages\\autogen_agentchat\\teams\\_group_chat\\_base_group_chat.py) if task is None: # 如果 task 是 None,则 messages 也是 None pass elif isinstance(task, str): # 如果 task 是字符串,就会被包装成一个 TextMessage,放到 messages 列表里 messages = [TextMessage(content=task, source=\"user\")] elif isinstance(task, BaseChatMessage): # 如果 task 是单个消息对象,也会被放到 messages 列表里。 messages = [task] elif isinstance(task, list): # 如果 task 是消息对象列表,则直接赋值给 messages。 ... # 也就是把 messages 作为参数,封装进 GroupChatStart 消息对象里 499行 await self._runtime.send_message( GroupChatStart(messages=messages), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), cancellation_token=cancellation_token, )3.通过 self._runtime.send_message(...),把 GroupChatStart 消息发送给群聊管理器(GroupChatManager)。# .venv/Lib/site-packages/autogen_agentchat/teams/_group_chat/_base_group_chat.py 中的run_stream方法 await self._runtime.send_message( GroupChatStart(messages=messages), recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id), cancellation_token=cancellation_token, )# recipient=AgentId(type=self._group_chat_manager_topic_type, key=self._team_id) # 明确指定了接收者就是群聊管理器, _group_chat_manager_topic_type 就是Orchestrator这个群聊管理器实例4.群聊管理器收到 GroupChatStart 后,会调用 handle_start 方法,正式启动群聊流程。# 对于 GroupChatStart 消息,自动调用handle_start方法,message参数就是刚收到的 GroupChatStart消息

self.team是一个团队对象,类型是GroupChat,在业务层(比如 WebSocket、CLI、API)不会直接操作 GroupChat,而是通过 TeamManager 这个中间层来操作团队。现在到了GroupChat.run_stream了,它内部继承了.venv\\Lib\\site-packages\\autogen_agentchat\\teams\\_group_chat\\_base_group_chat.py下的run_stream,这个就是上面代码框中描述的第二点。现在把消息封装到GroupChatStart

  1. WebSocket 路由/管理器调用 WebSocketManager.start_stream

  2. WebSocketManager.start_stream 调用 TeamManager.run_stream

  3. TeamManager.run_stream 内部调用 self.team.run_stream

  4. self.team 实际上就是 GroupChat 或其子类的实例

  5. 最终执行到 GroupChat.run_stream