消息与投递

命令队列

我们通过一个微小的进程内队列对所有入站的自动回复运行(所有渠道)进行序列化,以防止多个代理运行发生冲突,同时仍允许跨会话的安全并行处理。

为什么需要队列

  • 自动回复运行可能成本高昂(LLM 调用),并且在多个入站消息几乎同时到达时可能发生冲突。
  • 序列化可以避免争用共享资源(会话文件、日志、CLI 标准输入),并减少触发上游速率限制的机会。

工作原理

  • 一个支持通道的先进先出(FIFO)队列,以可配置的并发上限(未配置的通道默认为 1;主通道默认为 4,子代理默认为 8)来清空每个通道。
  • runEmbeddedPiAgent 通过会话键(通道 session:<key>)进行入队,以保证每个会话只有一个活动运行。
  • 然后,每个会话运行被排入一个全局通道(默认为 main),因此整体并行度受 agents.defaults.maxConcurrent 限制。
  • 启用详细日志记录时,如果排队运行在开始前等待超过约 2 秒,则会发出简短通知。
  • 打字指示器仍在入队时立即触发(当渠道支持时),因此在我们等待轮次时,用户体验保持不变。

队列模式(按渠道)

入站消息可以引导当前运行、等待后续轮次,或两者兼而有之:

  • steer:立即注入到当前运行中(在下一个工具边界后取消待处理的工具调用)。如果不支持流式传输,则回退到后续模式。
  • followup:在当前运行结束后,排队等待下一个代理轮次。
  • collect:将所有排队的消息合并为单个后续轮次(默认)。如果消息针对不同的渠道/线程,它们会单独清空以保留路由。
  • steer-backlog(又名 steer+backlog):立即引导保留消息以供后续轮次使用。
  • interrupt(旧版):中止该会话的活动运行,然后运行最新的消息。
  • queue(旧版别名):与 steer 相同。

引导-积压模式意味着您可以在引导运行后获得后续响应,因此流式传输界面可能会出现重复。如果您希望每个入站消息获得一个响应,请优先使用 collect/steer。发送 /queue collect 作为独立命令(按会话)或设置 messages.queue.byChannel.discord: "collect"。默认值(配置中未设置时):

  • 所有界面 → collect

通过 messages.queue 全局或按渠道配置:

{
  messages: {
    queue: {
      mode: "collect",
      debounceMs: 1000,
      cap: 20,
      drop: "summarize",
      byChannel: { discord: "collect" },
    },
  },
}

队列选项

选项适用于 followupcollectsteer-backlog(以及当 steer 回退到后续模式时):

  • debounceMs:在开始后续轮次前等待静默期(防止“继续,继续”)。
  • cap:每个会话的最大排队消息数。
  • drop:溢出策略(oldnewsummarize)。

总结模式会保留一个简短的被丢弃消息要点列表,并将其作为合成的后续提示注入。默认值:debounceMs: 1000cap: 20drop: summarize

按会话覆盖

  • 发送 /queue <mode> 作为独立命令,以存储当前会话的模式。
  • 选项可以组合使用:/queue collect debounce:2s cap:25 drop:summarize
  • /queue default/queue reset 清除会话覆盖。

范围和保证

  • 适用于所有使用网关回复管道的入站渠道(WhatsApp web、Telegram、Slack、Discord、Signal、iMessage、网页聊天等)的自动回复代理运行。
  • 默认通道(main)是进程范围的,用于入站和主心跳;设置 agents.defaults.maxConcurrent 以允许多个会话并行运行。
  • 可能存在额外的通道(例如 cronsubagent),以便后台作业可以并行运行而不会阻塞入站回复。
  • 按会话通道保证一次只有一个代理运行处理给定的会话。
  • 无外部依赖或后台工作线程;纯 TypeScript + Promise。

故障排除

  • 如果命令似乎卡住,请启用详细日志并查找“queued for …ms”行以确认队列正在清空。
  • 如果需要队列深度信息,请启用详细日志并观察队列计时行。

重试策略