Nanobot 从 AgentLoop 启动看怎么驱动大模型运行
背景在之前的文章中我们分析了nanobot onboard和nanobot gateway命令的实现,这次我们分析AgentLoop.run方法怎么驱动整个Agent的运行,从整理上来看该nanobot用到了Typer,Rich,Questionary,prompt_toolkit这种现代、美观且交互式命令行界面 (CLI) 的强大工具组合。Typer 用于定义 CLI 结构和参数Rich 负责文本样式、表格、面板和 Markdown 渲染Questionary 用于创建交互式问答界面其中Rich中的Console,Markdown,Table,Text用来进行渲染支持颜色、表格、面板、语法高亮和 Markdown 以更好的进行个性化的展示。其中最主要都是用协程实现其中也会调用线程来做其他工作。关于协程和线程的区别协程是用户态的轻量级“微线程”切换由用户程序控制没有内核态切换开销极小线程是内核态的资源单元切换由操作系统调度开销较大. 线程切换需要涉及用户态到内核态的转换上下文包括内核栈、硬件寄存器等保存和恢复资源较多Agent.run命令这个是所有Agent驱动的主循环方法也是消息传递的主入口它从 MessageBus 拉入站消息再异步分发到真正的处理逻辑最终把处理结果发送到outbound队列中从而发送给对应的Channel。于此同时能够优先处理/stop,/restart,/status这种命令1.设置启动标志并且链接MCPself._runningTrueawaitself._connect_mcp()首先用一个标志位来标记并且保持主循环持续运行再次连接远程的MCP服务器并把远程Tool注册进当前ToolRegistry中。在_connect_mcp的实现中使用了AsyncExitStack来手动管理上下文这里主要用enter_async_context和aclose两个方法注册和释放异步资源。如果在注册的过程中发生异常的话则调用close方法。在connect_mcp_servers方法中根据MCP的类型(是stdio或者sse)来建立不同的连接(使用了httpx模块)。最后以MCPToolWrapper类的方式注册到ToolRegistry中可以看到其实MCP也是最终通过Tool方式调用的。2.主循环消费入站队列消息这里是个死循环以最大超时1秒的间隔去从入站队列中消费消息。whileself._running:try:msgawaitasyncio.wait_for(self.bus.consume_inbound(),timeout1.0)exceptasyncio.TimeoutError:continueexceptasyncio.CancelledError:# Preserve real task cancellation so shutdown can complete cleanly.# Only ignore non-task CancelledError signals that may leak from integrations.ifnotself._runningorasyncio.current_task().cancelling():raisecontinueexceptExceptionase:logger.warning(Error consuming inbound message: {}, continuing...,e)continuerawmsg.content.strip()ifself.commands.is_priority(raw):ctxCommandContext(msgmsg,sessionNone,keymsg.session_key,rawraw,loopself)resultawaitself.commands.dispatch_priority(ctx)ifresult:awaitself.bus.publish_outbound(result)continuetaskasyncio.create_task(self._dispatch(msg))self._active_tasks.setdefault(msg.session_key,[]).append(task)task.add_done_callback(lambdat,kmsg.session_key:self._active_tasks.get(k,[])andself._active_tasks[k].remove(t)iftinself._active_tasks.get(k,[])elseNone)如果是/stop,/restart,/status则优先处理该命令如果有结果返回并把该命令的处理结果发送给出站队列否则使用asyncio.create_task来创建一个协程任务来异步执行并按照 session_key记录活跃任务同时使用add_done_callback方法来增加回调(任务完成后进行活跃任务的清理)_dispatch方法1. 并发控制lockself._session_locks.setdefault(msg.session_key, asyncio.Lock())gateself._concurrency_gate or nullcontext()这里会按照每个session_key 一把锁保证同一个session的并发资源控制;全局 asyncio.Semaphore 控制可以同时并发的运行的Session会话由环境变量 NANOBOT_MAX_CONCURRENT_REQUESTS 控制默认 3≤0 为 nullcontext不限制2. 流式输出可选默认 on_stream / on_stream_end 为 None。若 msg.metadata.get(“_wants_stream”) 为真例如某频道希望流式,也就是channel配置为streaming: trueon_stream(delta)把模型增量以 OutboundMessage 发出metadata 带 _stream_delta: True。on_stream_end(resuming…)发空内容metadata 带 _stream_end、_resuming供 UI 区分是彻底结束还是工具轮次中间暂停。3.核心处理_process_message当是普通用户消息时加载对应的session从目录workspace/sessions加载当前会话所属的历史消息。如果是/new,/status,/help命令则直接返回对应的处理结果prompt token合并,首先是获取对应session的独享锁计算 bucket self.context_window_tokens65536 - self.max_completion_tokens(4096) - self._SAFETY_BUFFER(1024) 60416 。计算 prompt占用token estimated调用session.get_history方法 该方法从seesion所有中消息取 第一条 user 裁齐去掉孤儿 tool 前缀 拷贝成只含 role / content / tool 相关键 的列表供 ContextBuilder.build_messages 等拼真实请求使用调用_build_messages拼出一份和真实请求结构相近的 probe_messages当前消息占位为 [token-probe]调用estimate_prompt_tokens_chain方法 该方法先调用模型自带的estimate_prompt_tokens方法如果模型没有对应的方法则使用tiktoken中的cl100k_base近似估算把各条消息里字符串 content、text 块、tool_calls/reasoning/name/tool_call_id 和 整份 tools JSON 拼成大字串用 cl100k_base 计数 token再加 4 * 消息条数 的粗开销得到整段 prompt 的 tiktoken 估计如果 estimated budget认为不需要合并打 debug「idle」日志,直接返回否则进行如下步骤调用pick_consolidation_boundary获取对应每轮次要进行合并的messege 边界最多经过5轮并调用consolidate_messages使用LLM save_memory 工具把这段对话摘要进HISTORY.md / MEMORY.md文件中这里巧妙的使用了TOOL参数的方式来更新对应HISTORY.md / MEMORY.md文件的内容。entry args[history_entry] update args[memory_update] ... self.append_history(entry) ... self.write_long_term(update)给带有路由信息的工具设置上下文fornamein(message,spawn,cron):iftool :self.tools.get(name):ifhasattr(tool,set_context): tool.set_context(channel, chat_id, *([message_id]ifnamemessageelse[]))设置MessageTool,SpawnTool,CronTool工具的目标channel和chat_id为入站信息的channel和chat_id。让它们发消息、起子代理、建 cron 时路由到当前会话避免多会话并发时上下文信息错位拼发给模型的消息列表historysession.get_history(max_messages0)initial_messagesself.context.build_messages(historyhistory,current_messagemsg.content,mediamsg.mediaifmsg.mediaelseNone,channelmsg.channel,chat_idmsg.chat_id,)这里的history只包括未合并的信息因为合并后的信息在build_messages中添加进来这里的build_messages会包括:一条role: system大段system_prompt见下文「系统提示的组成」。*history由调用方传入的会话历史通常为Session.get_history()等得到的user/assistant/tool消息列表。一条末尾消息role为current_role默认user子代理回灌等场景可为assistantcontent为merged运行时元数据 当前用户正文/图片。system_prompt由build_system_prompt(skill_names)生成各块之间用\n\n---\n\n连接主要包括部分来源方法内容概要身份与规范_get_identity()nanobot 身份、运行环境OS、架构、Python、工作区路径、MEMORY/HISTORY/skills 路径、Windows/POSIX 策略、工具与通过message发文件等指引。Bootstrap 文件_load_bootstrap_files()工作区根目录下若存在则加载AGENTS.md、SOUL.md、USER.md、TOOLS.md。长期记忆MemoryStore.get_memory_context()读取memory/MEMORY.md有内容则放在# Memory下。常驻技能get_always_skillsload_skills_for_context有则追加# Active Skills。技能目录build_skills_summary()有则追加# Skills及如何阅读SKILL.md等说明。其中 ~/.nanobot/workspace/ ├── AGENTS.md # 代理调度规则与标准作业程序,是代理的工作指南 ├── HEARTBEAT.md # 定时执行逻辑与主动任务状态自检让 代理 具备“自主意识” ├── SOUL.md # 响应语气、行为特征及输出格式配置,是代理的性格、核心价值观和长期指令 ├── TOOLS.md # 工具授权注册表及调用参数规范是代理的技能配置清单定义了工具准则 ├── USER.md # 用户画像数据包含特定偏好与交互限制配置 定义了 代理 如何服务你user_content 为由type: text或者type: image_url等多模态输入组成,具体参考OpenAI Chat Completions API和ChatCompletionContentPart具体的内容可以见 Nanobot的 system_prompt 示例跑 Agent 主循环这里主要是 调 LLM 有工具则并发执行并追加结果 再问模型」 直到得到无工具调用的最终文本或错误/步数耗尽并返回 最终正文、工具名列表、完整 messages 列表若 final_content is None 则 置为固定英文一句避免对外返回空串。持久化与后台合并self._save_turn(session,all_msgs,1len(history))self.sessions.save(session)对新增的messages进行清洗 user、截断 tool、丢弃有害空 assistant打上时间戳后 append 到 session.messages而后续的 sessions.save 直接写到jsonl文件中。如果MessageTool已发消息 则不再返回主回复 ,避免频道里再贴一条重复总结由 message 工具已发过),否则返回正常的消息当是由SpawnTool工具执行产生的子代理的消息时总体上处理逻辑和普通用户消息处理逻辑一样 区别点如下current_role assistant if msg.sender_id subagent如果是由subagent发送来的消息的话则 设置 消息role 为 “assistant”,没有对MessageTool的额外处理没有对/new命令的处理4. 出站处理ifresponseisnotNone:awaitself.bus.publish_outbound(response)elifmsg.channelcli:awaitself.bus.publish_outbound(OutboundMessage(channelmsg.channel,chat_idmsg.chat_id,content,metadatamsg.metadataor{},))如果主流程返回的内容不为空则直接进行出站处理否则如果channel是cli主要是本地CLI命令nanobot agent,则把出站内容置为空。