Skip to content

OpenClaw 源码解读(四)Channel 渠道系统

本文基于 OpenClaw 2026.3.2 源码,深入解读整个 Channel 渠道系统的架构设计与实现细节。


一、模块概览

Channel 渠道系统是 OpenClaw 最庞大、最复杂的模块,它负责对接所有外部消息平台(Telegram、Discord、Slack、WhatsApp、Signal、iMessage、IRC、Google Chat 等),并提供统一的抽象接口。这套系统采用 插件化架构,内置渠道和第三方扩展渠道遵循相同的 ChannelPlugin 协议。

1.1 核心源码分布

目录/文件行数(约)职责
src/channels/registry.ts~189渠道元数据注册表、ID 标准化、别名映射
src/channels/dock.ts~629轻量级渠道行为代理(Dock),供共享代码使用
src/channels/plugins/types.core.ts~370核心类型定义(30+ 类型)
src/channels/plugins/types.adapters.ts~382适配器接口定义(18 种适配器)
src/channels/plugins/types.plugin.ts~85ChannelPlugin 完整协议
src/channels/plugins/index.ts~117插件运行时注册表、缓存与查询
src/channels/plugins/catalog.ts~307插件目录发现与安装信息
src/channels/plugins/registry-loader.ts~35通用注册表加载器工厂
src/channels/plugins/load.ts~8按 ID 加载插件的快捷入口
src/channels/plugins/message-actions.ts~103跨渠道消息动作分发
src/channels/plugins/pairing.ts~69渠道配对(allowlist 审批)
src/channels/plugins/status.ts~36渠道账户状态快照构建
src/channels/allow-from.ts~53DM/Group allowFrom 合并逻辑
src/channels/command-gating.ts~45控制命令权限门控
src/channels/mention-gating.ts~59@提及门控(群组消息过滤)
src/channels/session.ts~81入站会话记录
src/channels/chat-type.ts~18会话类型定义(direct/group/channel)
src/channels/targets.ts~146消息目标解析与构建
src/channels/model-overrides.ts~142按渠道/群组的模型覆盖
src/channels/status-reactions.ts~383状态反应控制器(emoji 状态指示)
src/channels/draft-stream-loop.ts~104流式草稿输出循环
src/channels/draft-stream-controls.ts~142可终结的流式草稿控制器
src/channels/typing.ts~99输入状态指示器
src/channels/inbound-debounce-policy.ts~51入站消息防抖策略
src/channels/transport/stall-watchdog.ts~103传输层停滞看门狗
src/channels/account-summary.ts~70渠道账户摘要构建
src/routing/resolve-route.ts~719Agent 路由解析引擎
src/routing/session-key.ts~246会话 Key 构建与解析
src/routing/bindings.ts~113路由绑定配置解析
src/routing/account-lookup.ts~14账户大小写不敏感查找
src/telegram/bot.ts~350+Telegram Bot 创建(具体渠道实现示例)
src/telegram/bot-message-context.ts~500+Telegram 入站消息上下文构建
src/discord/monitor/provider.ts~500+Discord 监听器提供者
src/slack/monitor/provider.ts~400+Slack Socket Mode 提供者

1.2 系统全景

┌─────────────────────────────────────────────────────────────────┐
│                    Channel 渠道系统全景                           │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │               Plugin Registry(插件注册表)               │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │    │
│  │  │ Telegram │ │ Discord  │ │  Slack   │ │ WhatsApp │   │    │
│  │  │  Plugin  │ │  Plugin  │ │  Plugin  │ │  Plugin  │   │    │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │    │
│  │  │  Signal  │ │ iMessage │ │   IRC    │ │ GChat    │   │    │
│  │  │  Plugin  │ │  Plugin  │ │  Plugin  │ │  Plugin  │   │    │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │    │
│  │  ┌──────────┐ ┌──────────┐ ...扩展渠道                   │    │
│  │  │ MS Teams │ │  Matrix  │                               │    │
│  │  │(extension)│ │(extension)│                              │    │
│  │  └──────────┘ └──────────┘                               │    │
│  └─────────────────────────────────────────────────────────┘    │
│                              │                                   │
│              ┌───────────────┼───────────────┐                   │
│              ▼               ▼               ▼                   │
│  ┌────────────────┐ ┌──────────────┐ ┌──────────────┐           │
│  │  Dock(轻量代理)│ │ Outbound     │ │ Message      │           │
│  │  capabilities   │ │ (出站发送)    │ │ Actions      │           │
│  │  threading      │ │ sendText     │ │ (消息动作)    │           │
│  │  mentions       │ │ sendMedia    │ │ send/react/  │           │
│  │  groups         │ │ sendPayload  │ │ kick/ban     │           │
│  └────────────────┘ └──────────────┘ └──────────────┘           │
│              │                                                   │
│              ▼                                                   │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │               共享安全与路由层                              │    │
│  │  ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐   │    │
│  │  │AllowFrom │ │ Command  │ │ Mention  │ │ Route    │   │    │
│  │  │ Gating   │ │ Gating   │ │ Gating   │ │ Resolver │   │    │
│  │  └──────────┘ └──────────┘ └──────────┘ └──────────┘   │    │
│  └─────────────────────────────────────────────────────────┘    │
│                              │                                   │
│              ┌───────────────┼───────────────┐                   │
│              ▼               ▼               ▼                   │
│  ┌────────────────┐ ┌──────────────┐ ┌──────────────┐           │
│  │  Session 管理   │ │  Draft Stream│ │  Status      │           │
│  │  会话键构建     │ │  流式输出     │ │  Reactions   │           │
│  │  持久化记录     │ │  节流控制     │ │  Emoji 状态  │           │
│  └────────────────┘ └──────────────┘ └──────────────┘           │
└─────────────────────────────────────────────────────────────────┘

二、ChannelPlugin 协议——渠道的"合同"

所有渠道(无论内置还是扩展)都必须实现 ChannelPlugin 接口。这是整个渠道系统的核心协议。

2.1 协议定义

typescript
// src/channels/plugins/types.plugin.ts
export type ChannelPlugin<ResolvedAccount = any, Probe = unknown, Audit = unknown> = {
  id: ChannelId;                    // 唯一标识符,如 "telegram"
  meta: ChannelMeta;                // UI 元数据(标签、图标、文档路径)
  capabilities: ChannelCapabilities; // 能力声明(支持哪些功能)

  // 可选适配器(共 18 个接口槽位)
  config: ChannelConfigAdapter<ResolvedAccount>;   // 必须:账户配置读取
  setup?: ChannelSetupAdapter;                      // 账户设置向导
  pairing?: ChannelPairingAdapter;                  // 配对/审批
  security?: ChannelSecurityAdapter<ResolvedAccount>; // DM 安全策略
  groups?: ChannelGroupAdapter;                      // 群组策略
  mentions?: ChannelMentionAdapter;                  // @提及处理
  outbound?: ChannelOutboundAdapter;                 // 出站消息发送
  status?: ChannelStatusAdapter<...>;                // 状态探测/审计
  gateway?: ChannelGatewayAdapter<...>;              // 网关集成(启停账户)
  auth?: ChannelAuthAdapter;                         // 登录认证
  elevated?: ChannelElevatedAdapter;                 // 特权降级
  commands?: ChannelCommandAdapter;                  // 命令控制
  streaming?: ChannelStreamingAdapter;               // 流式输出配置
  threading?: ChannelThreadingAdapter;               // 线程/回复模式
  messaging?: ChannelMessagingAdapter;               // 消息目标解析
  agentPrompt?: ChannelAgentPromptAdapter;           // Agent 提示词注入
  directory?: ChannelDirectoryAdapter;               // 联系人目录
  resolver?: ChannelResolverAdapter;                 // 目标解析器
  actions?: ChannelMessageActionAdapter;             // 消息动作处理
  heartbeat?: ChannelHeartbeatAdapter;               // 心跳检测
  onboarding?: ChannelOnboardingAdapter;             // CLI 引导向导
  agentTools?: ChannelAgentToolFactory | ChannelAgentTool[]; // 渠道专属 Agent 工具

  // 配置热重载
  reload?: { configPrefixes: string[]; noopPrefixes?: string[] };
  configSchema?: ChannelConfigSchema;
  defaults?: { queue?: { debounceMs?: number } };
  gatewayMethods?: string[];
};

2.2 能力声明

每个渠道通过 capabilities 明确声明自己支持什么:

typescript
export type ChannelCapabilities = {
  chatTypes: Array<ChatType | "thread">;  // 支持的会话类型
  polls?: boolean;           // 投票
  reactions?: boolean;       // 表情反应
  edit?: boolean;            // 消息编辑
  unsend?: boolean;          // 撤回消息
  reply?: boolean;           // 回复消息
  effects?: boolean;         // 消息效果
  groupManagement?: boolean; // 群组管理
  threads?: boolean;         // 线程
  media?: boolean;           // 媒体消息
  nativeCommands?: boolean;  // 原生命令菜单
  blockStreaming?: boolean;  // 分块流式输出
};

这套能力声明机制让上层代码无需关心具体渠道,只需检查 capabilities.polls 即可知道是否支持投票功能。

2.3 18 种适配器接口

渠道系统采用 微适配器 设计——将渠道行为拆分为 18 个独立的适配器接口,每个渠道只需实现自己支持的部分:

适配器必须?职责
config账户列表、解析、启用状态、allowFrom
setup账户设置向导(applyAccountConfig)
pairing设备配对、allowlist 审批通知
securityDM 策略解析、安全警告
groups群组 @提及要求、工具策略
mentions@提及文本清理正则
outbound出站消息发送(文本/媒体/投票)
status健康探测、审计、运行时快照
gateway启动/停止账户、QR 登录、登出
authCLI 登录流程
elevated特权降级 allowFrom 回退
commands命令权限控制标志
streaming分块流式输出参数
threading回复模式、线程上下文构建
messaging消息目标标准化
agentPromptAgent 提示词渠道特化
directory联系人/群组目录查询
resolver目标名称→ID 解析
actions渠道消息动作(send/react/kick/ban)
heartbeat心跳就绪检查

三、三层架构:Registry → Dock → Plugin

渠道系统采用精心设计的三层架构,解决"重量级插件"的导入成本问题。

3.1 Registry(注册表)——元数据层

typescript
// src/channels/registry.ts
export const CHAT_CHANNEL_ORDER = [
  "telegram", "whatsapp", "discord", "irc",
  "googlechat", "slack", "signal", "imessage",
] as const;

const CHAT_CHANNEL_META: Record<ChatChannelId, ChannelMeta> = {
  telegram: {
    id: "telegram",
    label: "Telegram",
    selectionLabel: "Telegram (Bot API)",
    docsPath: "/channels/telegram",
    blurb: "simplest way to get started...",
    systemImage: "paperplane",
  },
  // ... 其余 7 个渠道
};

export const CHAT_CHANNEL_ALIASES: Record<string, ChatChannelId> = {
  imsg: "imessage",
  "internet-relay-chat": "irc",
  "google-chat": "googlechat",
  gchat: "googlechat",
};

Registry 层的特点:

  • 零运行时依赖:不导入任何渠道实现代码
  • 静态数据:元数据在编译时确定
  • 别名映射imsgimessagegchatgooglechat
  • 标准化入口normalizeChatChannelId() → 先查别名再匹配列表

3.2 Dock(轻量代理)——行为摘要层

Dock 是渠道系统最精妙的设计之一。它是 Plugin 的轻量级投影,只包含共享代码路径需要的行为片段:

typescript
// src/channels/dock.ts
export type ChannelDock = {
  id: ChannelId;
  capabilities: ChannelCapabilities;
  commands?: ChannelCommandAdapter;
  outbound?: { textChunkLimit?: number };
  streaming?: ChannelDockStreaming;
  elevated?: ChannelElevatedAdapter;
  config?: Pick<ChannelConfigAdapter<unknown>,
    "resolveAllowFrom" | "formatAllowFrom" | "resolveDefaultTo">;
  groups?: ChannelGroupAdapter;
  mentions?: ChannelMentionAdapter;
  threading?: ChannelThreadingAdapter;
  agentPrompt?: ChannelAgentPromptAdapter;
};

为什么需要 Dock?

Plugin(重量级)               Dock(轻量级)
─────────────────            ─────────────────
导入 grammy Bot SDK          ✗ 不导入
导入 Puppeteer               ✗ 不导入
导入 HTTP 服务器              ✗ 不导入
导入 WebSocket 客户端         ✗ 不导入
capabilities ✓               capabilities ✓
threading ✓                  threading ✓
mentions ✓                   mentions ✓
allowFrom 解析 ✓              allowFrom 解析 ✓

内置渠道的 Dock 是手写的静态对象,直接内联在 dock.ts 中。扩展渠道的 Dock 则从 Plugin 自动构建:

typescript
function buildDockFromPlugin(plugin: ChannelPlugin): ChannelDock {
  return {
    id: plugin.id,
    capabilities: plugin.capabilities,
    commands: plugin.commands,
    outbound: plugin.outbound?.textChunkLimit
      ? { textChunkLimit: plugin.outbound.textChunkLimit }
      : undefined,
    streaming: plugin.streaming ? { ... } : undefined,
    elevated: plugin.elevated,
    config: plugin.config ? { ... } : undefined,
    groups: plugin.groups,
    mentions: plugin.mentions,
    threading: plugin.threading,
    agentPrompt: plugin.agentPrompt,
  };
}

3.3 Plugin(完整实现)——执行层

Plugin 是渠道的完整运行时实现,由插件注册表管理。只在真正需要时才加载:

typescript
// src/channels/plugins/index.ts
function resolveCachedChannelPlugins(): CachedChannelPlugins {
  const registry = requireActivePluginRegistry();
  const registryVersion = getActivePluginRegistryVersion();

  // 版本化缓存:注册表变化时自动失效
  if (cached.registryVersion === registryVersion) return cached;

  // 去重 + 排序(内置渠道按 CHAT_CHANNEL_ORDER,扩展渠道按 order 或字母序)
  const sorted = dedupeChannels(registry.channels.map(e => e.plugin))
    .toSorted((a, b) => {
      const indexA = CHAT_CHANNEL_ORDER.indexOf(a.id);
      const orderA = a.meta.order ?? (indexA === -1 ? 999 : indexA);
      // ...
    });

  // 构建 byId Map 加速查找
  const byId = new Map<string, ChannelPlugin>();
  for (const plugin of sorted) byId.set(plugin.id, plugin);

  return { registryVersion, sorted, byId };
}

三层架构的导入开销对比:

操作RegistryDockPlugin
normalizeChatChannelId("tg")0 依赖
getChannelDock("telegram").capabilities轻量依赖
getChannelPlugin("telegram").gateway.startAccount()完整 SDK

四、路由系统——消息如何找到正确的 Agent

路由系统是渠道系统的"大脑",它决定每条入站消息应该由哪个 Agent 处理,以及使用哪个 Session。

4.1 路由解析引擎

typescript
// src/routing/resolve-route.ts
export function resolveAgentRoute(input: ResolveAgentRouteInput): ResolvedAgentRoute

输入:

typescript
type ResolveAgentRouteInput = {
  cfg: OpenClawConfig;
  channel: string;           // "telegram"
  accountId?: string | null;  // "bot1"
  peer?: RoutePeer | null;    // { kind: "group", id: "123456" }
  parentPeer?: RoutePeer | null; // 线程的父级
  guildId?: string | null;    // Discord guild ID
  teamId?: string | null;     // Slack team ID
  memberRoleIds?: string[];   // Discord 角色 ID 列表
};

输出:

typescript
type ResolvedAgentRoute = {
  agentId: string;          // "main" 或具体 agent ID
  channel: string;          // 标准化后的渠道名
  accountId: string;        // 标准化后的账户 ID
  sessionKey: string;       // 完整会话键
  mainSessionKey: string;   // 主会话键(用于 DM 折叠)
  matchedBy: string;        // 匹配方式(调试用)
};

4.2 七层优先级匹配

路由引擎按照七层优先级逐层匹配,命中第一层后立即返回:

优先级从高到低:

Tier 1: binding.peer          ← 精确匹配 Peer(用户/群组 ID)
Tier 2: binding.peer.parent   ← 匹配线程的父级 Peer
Tier 3: binding.guild+roles   ← Discord Guild + 角色组合
Tier 4: binding.guild         ← Discord Guild 级别
Tier 5: binding.team          ← Slack Team 级别
Tier 6: binding.account       ← 账户级别
Tier 7: binding.channel       ← 渠道级别(通配符 accountId="*")
Default: 使用默认 Agent

源码中用 tiers 数组实现了这个优先级链:

typescript
const tiers = [
  {
    matchedBy: "binding.peer",
    enabled: Boolean(peer),
    candidates: collectPeerIndexedBindings(bindingsIndex, peer),
    predicate: (c) => c.match.peer.state === "valid",
  },
  {
    matchedBy: "binding.peer.parent",
    enabled: Boolean(parentPeer && parentPeer.id),
    candidates: collectPeerIndexedBindings(bindingsIndex, parentPeer),
    // ...
  },
  // ... 其余 5 层
];

for (const tier of tiers) {
  if (!tier.enabled) continue;
  const matched = tier.candidates.find(c =>
    tier.predicate(c) && matchesBindingScope(c.match, baseScope)
  );
  if (matched) return choose(matched.binding.agentId, tier.matchedBy);
}

return choose(resolveDefaultAgentId(cfg), "default");

4.3 绑定索引加速

对于有大量 binding 配置的场景,每次路由解析都遍历全部 binding 会很慢。系统为此构建了索引结构

typescript
type EvaluatedBindingsIndex = {
  byPeer: Map<string, EvaluatedBinding[]>;          // peer:id → bindings
  byGuildWithRoles: Map<string, EvaluatedBinding[]>; // guildId → bindings(有角色约束)
  byGuild: Map<string, EvaluatedBinding[]>;          // guildId → bindings(无角色约束)
  byTeam: Map<string, EvaluatedBinding[]>;           // teamId → bindings
  byAccount: EvaluatedBinding[];                     // 账户级 bindings
  byChannel: EvaluatedBinding[];                     // 渠道级 bindings(通配符)
};

索引通过 WeakMap<OpenClawConfig, Cache> 绑定到配置对象,配置不变时缓存命中,配置变化时自动失效。还有 LRU 大小限制:

typescript
const MAX_EVALUATED_BINDINGS_CACHE_KEYS = 2000;
const MAX_RESOLVED_ROUTE_CACHE_KEYS = 4000;

4.4 会话键(Session Key)构建

会话键是 OpenClaw 消息持久化和并发控制的关键。格式:

agent:<agentId>:<rest>

具体形式取决于 DM Scope 配置和会话类型:

dmScope直聊键格式说明
main(默认)agent:main:main所有直聊共享一个会话
per-peeragent:main:direct:alice每个对话者独立会话
per-channel-peeragent:main:telegram:direct:alice每个渠道×对话者独立
per-account-channel-peeragent:main:telegram:bot1:direct:alice最细粒度

群组/频道键格式固定:

agent:main:telegram:group:123456

线程会话支持后缀扩展:

agent:main:slack:channel:C12345:thread:1700000000.000000

跨渠道用户身份合并:

typescript
// 配置示例
session: {
  identityLinks: {
    alice: ["telegram:12345", "discord:67890", "alice@email.com"]
  }
}

dmScope 不是 main 时,buildAgentPeerSessionKey() 会调用 resolveLinkedPeerId() 将不同渠道的用户 ID 映射到同一个规范名(alice),使得 Telegram 和 Discord 的同一用户共享会话历史。


五、安全门控层——三道关卡

每条入站消息在到达 Agent 之前,必须通过三道安全门控。

5.1 AllowFrom 门控(谁能说话)

typescript
// src/channels/allow-from.ts
export function mergeDmAllowFromSources(params: {
  allowFrom?: Array<string | number>;   // 配置文件中的静态 allowlist
  storeAllowFrom?: Array<string | number>; // 运行时 store 中的动态 allowlist
  dmPolicy?: string;
}): string[] {
  // allowlist 策略下只用配置文件的列表,不合并 store
  const storeEntries = params.dmPolicy === "allowlist" ? [] : (params.storeAllowFrom ?? []);
  return [...(params.allowFrom ?? []), ...storeEntries]
    .map(v => String(v).trim())
    .filter(Boolean);
}

AllowFrom 检查的核心逻辑:

typescript
export function isSenderIdAllowed(
  allow: { entries: string[]; hasWildcard: boolean; hasEntries: boolean },
  senderId: string | undefined,
  allowWhenEmpty: boolean,  // 列表为空时的默认策略
): boolean {
  if (!allow.hasEntries) return allowWhenEmpty;  // 空列表→看默认策略
  if (allow.hasWildcard) return true;            // "*" 通配符→全部允许
  if (!senderId) return false;                   // 无发送者 ID→拒绝
  return allow.entries.includes(senderId);       // 精确匹配
}

5.2 Command 门控(谁能执行命令)

typescript
// src/channels/command-gating.ts
export function resolveControlCommandGate(params: {
  useAccessGroups: boolean;      // 是否启用 access groups
  authorizers: CommandAuthorizer[];  // 各级权限判定器
  allowTextCommands: boolean;    // 是否允许文本命令
  hasControlCommand: boolean;    // 消息是否包含控制命令
}): { commandAuthorized: boolean; shouldBlock: boolean } {
  const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({ ... });
  // 只有"允许文本命令 + 有控制命令 + 未授权"时才阻断
  const shouldBlock = params.allowTextCommands
    && params.hasControlCommand
    && !commandAuthorized;
  return { commandAuthorized, shouldBlock };
}

三种工作模式(当 AccessGroups 关闭时):

  • allow(默认):全部放行
  • deny:全部阻断
  • configured:有任何 authorizer 配置了→检查权限;全未配置→放行

5.3 Mention 门控(群组中是否被 @)

群组消息默认需要 @提及 Bot 才会响应,防止 Bot 对每条群消息都回复:

typescript
// src/channels/mention-gating.ts
export function resolveMentionGating(params: MentionGateParams): MentionGateResult {
  const implicit = params.implicitMention === true;  // 隐式提及(如回复 Bot 消息)
  const bypass = params.shouldBypassMention === true; // 绕过提及检查
  const effectiveWasMentioned = params.wasMentioned || implicit || bypass;
  const shouldSkip = params.requireMention
    && params.canDetectMention
    && !effectiveWasMentioned;
  return { effectiveWasMentioned, shouldSkip };
}

提及绕过机制:当用户在群组中发送控制命令(如 /reset)且有权限时,即使没有 @提及也不跳过:

typescript
export function resolveMentionGatingWithBypass(params): MentionGateWithBypassResult {
  const shouldBypassMention =
    params.isGroup &&
    params.requireMention &&
    !params.wasMentioned &&
    !(params.hasAnyMention ?? false) &&
    params.allowTextCommands &&
    params.commandAuthorized &&
    params.hasControlCommand;   // 控制命令绕过提及门控
  // ...
}

5.4 三道关卡的执行顺序

以 Telegram 为例,在 bot-message-context.ts 中的完整执行顺序:

入站消息


① AllowFrom 检查
  ├── DM: mergeDmAllowFromSources() → isSenderAllowed()
  │   └── 未通过 → 配对消息或静默丢弃
  └── Group: evaluateTelegramGroupBaseAccess()
      └── 未通过 → logInboundDrop()



② Mention 门控
  ├── hasBotMention() 检测是否被 @
  ├── implicitMention 检测(回复 Bot 消息)
  └── resolveMentionGatingWithBypass()
      └── shouldSkip → 静默忽略



③ Command 门控
  ├── hasControlCommand() 检测控制命令
  └── resolveControlCommandGate()
      └── shouldBlock → 静默忽略



通过 → 交给 Agent 处理

六、出站消息发送

6.1 OutboundAdapter 接口

typescript
export type ChannelOutboundAdapter = {
  deliveryMode: "direct" | "gateway" | "hybrid";
  chunker?: (text: string, limit: number) => string[];
  chunkerMode?: "text" | "markdown";
  textChunkLimit?: number;
  pollMaxOptions?: number;
  resolveTarget?: (...) => { ok: true; to: string } | { ok: false; error: Error };
  sendText?: (ctx: ChannelOutboundContext) => Promise<OutboundDeliveryResult>;
  sendMedia?: (ctx: ChannelOutboundContext) => Promise<OutboundDeliveryResult>;
  sendPayload?: (ctx: ChannelOutboundPayloadContext) => Promise<OutboundDeliveryResult>;
  sendPoll?: (ctx: ChannelPollContext) => Promise<ChannelPollResult>;
};

三种发送模式:

  • direct:渠道直接调用平台 API(Telegram、Discord、Slack)
  • gateway:通过 Gateway WebSocket 转发(扩展渠道常用)
  • hybrid:直发 + 网关混合

6.2 加载优化

出站适配器有独立的轻量加载器,不会拉入完整的 Plugin:

typescript
// src/channels/plugins/outbound/load.ts
const loadOutboundAdapterFromRegistry = createChannelRegistryLoader<ChannelOutboundAdapter>(
  (entry) => entry.plugin.outbound,
);

6.3 Telegram 出站示例

typescript
// src/channels/plugins/outbound/telegram.ts
export const telegramOutbound: ChannelOutboundAdapter = {
  deliveryMode: "direct",
  chunker: markdownToTelegramHtmlChunks,  // Markdown → Telegram HTML 分块
  chunkerMode: "markdown",
  textChunkLimit: 4000,                    // Telegram 消息长度限制

  sendText: async ({ to, text, accountId, deps, replyToId, threadId }) => {
    const { send, baseOpts } = resolveTelegramSendContext({ deps, accountId, replyToId, threadId });
    const result = await send(to, text, { ...baseOpts });
    return { channel: "telegram", ...result };
  },

  sendMedia: async ({ to, text, mediaUrl, mediaLocalRoots, ... }) => {
    const { send, baseOpts } = resolveTelegramSendContext({ ... });
    const result = await send(to, text, { ...baseOpts, mediaUrl, mediaLocalRoots });
    return { channel: "telegram", ...result };
  },

  sendPayload: async ({ to, payload, ... }) => {
    // 支持 inline buttons、引用文本、多媒体
    // 多张媒体时循环发送,第一张附带 buttons
    for (let i = 0; i < mediaUrls.length; i++) {
      finalResult = await send(to, i === 0 ? text : "", {
        ...payloadOpts,
        mediaUrl: mediaUrls[i],
        ...(i === 0 ? { buttons: telegramData?.buttons } : {}),
      });
    }
  },
};

七、流式输出——Draft Stream

AI 回复通常是流式生成的,Draft Stream 机制让用户能看到实时的打字效果。

7.1 核心循环

typescript
// src/channels/draft-stream-loop.ts
export function createDraftStreamLoop(params: {
  throttleMs: number;
  isStopped: () => boolean;
  sendOrEditStreamMessage: (text: string) => Promise<void | boolean>;
}): DraftStreamLoop {
  let lastSentAt = 0;
  let pendingText = "";
  let inFlightPromise: Promise<void | boolean> | undefined;
  let timer: ReturnType<typeof setTimeout> | undefined;

  return {
    update: (text: string) => {
      if (params.isStopped()) return;
      pendingText = text;
      // 如果有请求在飞行中 → 调度延迟发送
      if (inFlightPromise) { schedule(); return; }
      // 如果节流窗口已过 → 立即发送
      if (!timer && Date.now() - lastSentAt >= params.throttleMs) {
        void flush(); return;
      }
      // 否则调度
      schedule();
    },
    flush,   // 强制发送当前 pendingText
    stop,    // 停止并清空
    resetPending,        // 清空待发文本
    resetThrottleWindow, // 重置节流窗口
    waitForInFlight,     // 等待飞行中的请求完成
  };
}

7.2 可终结的流式控制器

typescript
// src/channels/draft-stream-controls.ts
export function createFinalizableDraftStreamControls(params: {
  throttleMs: number;
  isStopped: () => boolean;
  isFinal: () => boolean;       // 是否已进入最终状态
  markStopped: () => void;
  markFinal: () => void;
  sendOrEditStreamMessage: (text: string) => Promise<boolean>;
}) {
  const loop = createDraftStreamLoop({ ... });

  return {
    update: (text: string) => {
      if (params.isStopped() || params.isFinal()) return;
      loop.update(text);
    },
    stop: async () => {        // 正常完成:标记 final → flush 最后内容
      params.markFinal();
      await loop.flush();
    },
    stopForClear: async () => { // 异常中断:标记 stopped → 停止循环 → 等待飞行请求
      params.markStopped();
      loop.stop();
      await loop.waitForInFlight();
    },
  };
}

状态机:

STREAMING ──update()──→ STREAMING
    │                        │
    │ stop()                 │ stopForClear()
    ▼                        ▼
  FINAL                   STOPPED
  (flush残余)             (立即停止)
    │                        │
    ▼                        ▼
   DONE                    DONE

八、状态反应控制器——Emoji 进度条

Agent 处理消息时,通过 Emoji 反应向用户展示实时状态。

8.1 状态 Emoji 映射

typescript
// src/channels/status-reactions.ts
export const DEFAULT_EMOJIS = {
  queued:    "👀",   // 排队中
  thinking:  "🤔",   // 思考中
  tool:      "🔥",   // 使用工具
  coding:    "👨‍💻",  // 执行代码
  web:       "⚡",   // 网络搜索
  done:      "👍",   // 完成
  error:     "😱",   // 出错
  stallSoft: "🥱",   // 轻微停滞(10s)
  stallHard: "😨",   // 严重停滞(30s)
};

工具类型自动识别:

typescript
export function resolveToolEmoji(toolName: string | undefined, emojis): string {
  const normalized = toolName?.trim().toLowerCase() ?? "";
  if (WEB_TOOL_TOKENS.some(t => normalized.includes(t))) return emojis.web;      // 🌐
  if (CODING_TOOL_TOKENS.some(t => normalized.includes(t))) return emojis.coding; // 💻
  return emojis.tool;  // 🔥
}

8.2 控制器设计

typescript
export function createStatusReactionController(params: {
  enabled: boolean;
  adapter: StatusReactionAdapter;   // 平台特定的 set/remove 实现
  initialEmoji: string;
  emojis?: StatusReactionEmojis;
  timing?: StatusReactionTiming;
}): StatusReactionController

核心机制:

  1. Promise 链序列化:所有 API 调用通过 chainPromise 串行执行,防止竞态
  2. 防抖:中间状态(thinking/tool)有 700ms 防抖,终态(done/error)立即执行
  3. 停滞检测:10s 无活动 → 🥱;30s → 😨
  4. 终态保护:done/error 之后忽略所有后续更新
typescript
function scheduleEmoji(emoji: string, options = {}) {
  if (!enabled || finished) return;
  if (emoji === currentEmoji || emoji === pendingEmoji) {
    if (!options.skipStallReset) resetStallTimers();
    return;  // 去重
  }
  pendingEmoji = emoji;
  clearDebounceTimer();

  if (options.immediate) {
    void enqueue(async () => { await applyEmoji(emoji); });
  } else {
    debounceTimer = setTimeout(() => {
      void enqueue(async () => { await applyEmoji(emoji); });
    }, timing.debounceMs);
  }
  if (!options.skipStallReset) resetStallTimers();
}

九、入站消息处理——以 Telegram 为例

9.1 创建 Bot

typescript
// src/telegram/bot.ts
export function createTelegramBot(opts: TelegramBotOptions) {
  const bot = new Bot(opts.token, client ? { client } : undefined);
  bot.api.config.use(apiThrottler());  // API 节流
  bot.catch((err) => { ... });          // 全局错误处理

  const recentUpdates = createTelegramUpdateDedupe();  // 更新去重
  // ... 注册消息处理器、原生命令等
}

关键组件:

  • grammY:Telegram Bot SDK
  • apiThrottler:API 调用频率限制
  • sequentialize:按 chat ID 序列化更新处理
  • updateDedupe:Update ID 去重(防止轮询+Webhook 并行时重复处理)

9.2 消息上下文构建

bot-message-context.ts 是 Telegram 入站消息处理的核心,它将 Telegram 原始消息转化为标准化的 MsgContext

Telegram Update


解析消息内容(文本/媒体/位置/贴纸/转发)


构建 sender 信息(ID/名称/用户名)


路由解析 resolveAgentRoute()
  │ → agentId, sessionKey, accountId

安全门控(AllowFrom → Mention → Command)


构建 MsgContext(标准化消息上下文)


会话记录 recordInboundSession()


创建 StatusReactionController


交给 Agent 消息处理器

9.3 防抖策略

快速连续消息(如用户连发多条)会被防抖合并:

typescript
// src/channels/inbound-debounce-policy.ts
export function shouldDebounceTextInbound(params: {
  text: string | null | undefined;
  cfg: OpenClawConfig;
  hasMedia?: boolean;
  allowDebounce?: boolean;
}): boolean {
  if (params.allowDebounce === false) return false;
  if (params.hasMedia) return false;             // 媒体消息不防抖
  const text = params.text?.trim() ?? "";
  if (!text) return false;
  return !hasControlCommand(text, params.cfg);  // 控制命令不防抖
}

防抖规则:

  • 纯文本消息 → 防抖(等待后续消息合并)
  • 媒体消息 → 不防抖(立即处理)
  • 控制命令 → 不防抖(立即执行)
  • 配置了 allowDebounce: false → 不防抖

十、Typing 指示器——输入状态管理

10.1 TTL 安全机制

typescript
// src/channels/typing.ts
export function createTypingCallbacks(params): TypingCallbacks {
  const maxDurationMs = params.maxDurationMs ?? 60_000; // 60s TTL 安全上限

  const startGuard = createTypingStartGuard({
    maxConsecutiveFailures,     // 连续失败 2 次后熔断
    onTrip: () => keepaliveLoop.stop(),
  });

  const keepaliveLoop = createTypingKeepaliveLoop({
    intervalMs: keepaliveIntervalMs,  // 默认 3s
    onTick: fireStart,
  });

  return {
    onReplyStart: async () => {
      startGuard.reset();
      keepaliveLoop.stop();
      await fireStart();           // 发送第一个 typing 指示
      keepaliveLoop.start();       // 启动 keepalive
      startTtlTimer();             // 启动 TTL 安全计时器
    },
    onIdle: fireStop,              // 空闲时停止
    onCleanup: fireStop,           // 清理时停止
  };
}

三层保护:

  1. Keepalive 循环:每 3s 重新发送 typing 指示(平台通常 5s 后消失)
  2. 熔断器:连续 2 次 start 失败 → 停止 keepalive
  3. TTL 安全:最多 60s 后自动停止(防止忘记清理)

十一、传输层看门狗

typescript
// src/channels/transport/stall-watchdog.ts
export function createArmableStallWatchdog(params: {
  label: string;
  timeoutMs: number;
  checkIntervalMs?: number;
  abortSignal?: AbortSignal;
  onTimeout: (meta) => void;
}): ArmableStallWatchdog

看门狗在以下场景使用:

  • WebSocket 连接的 ping/pong 超时检测
  • 轮询请求的响应超时检测
  • 长运行操作的活跃度检测
typescript
return {
  arm: (atMs?) => { armed = true; lastActivityAt = atMs ?? Date.now(); },
  touch: (atMs?) => { lastActivityAt = atMs ?? Date.now(); },  // 喂狗
  disarm: () => { armed = false; },
  stop: () => { stopped = true; clearInterval(timer); },
  isArmed: () => armed,
};

十二、模型覆盖——按渠道/群组切换模型

typescript
// src/channels/model-overrides.ts
export function resolveChannelModelOverride(params): ChannelModelOverride | null

配置示例:

json5
{
  channels: {
    modelByChannel: {
      telegram: {
        "*": "claude-3.5-sonnet",           // 默认
        "group:123456": "gpt-4o",           // 特定群组
        "#general": "claude-3-haiku",       // 按频道名
      },
      discord: {
        "*": "gpt-4o-mini",
      }
    }
  }
}

查找逻辑使用 buildChannelKeyCandidates() 生成候选键列表:

groupId (精确 ID) > parentGroupId > groupChannel (#名称)
> channelBare (去 #) > channelSlug (标准化) > "*" (通配符)

十三、渠道目录系统(Catalog)

Catalog 解决"如何发现可安装的渠道插件"的问题。

13.1 四种来源优先级

typescript
const ORIGIN_PRIORITY = {
  config: 0,      // 配置文件中指定的插件
  workspace: 1,   // 工作区中的本地插件
  global: 2,      // 全局安装的插件
  bundled: 3,     // 内置插件
};

13.2 外部目录

除了代码中的插件,还支持从外部 JSON 文件加载目录:

typescript
const DEFAULT_CATALOG_PATHS = [
  path.join(CONFIG_DIR, "mpm", "plugins.json"),
  path.join(CONFIG_DIR, "mpm", "catalog.json"),
  path.join(CONFIG_DIR, "plugins", "catalog.json"),
];

每个 catalog entry 包含安装信息:

typescript
type ChannelPluginCatalogEntry = {
  id: string;
  meta: ChannelMeta;
  install: {
    npmSpec: string;       // npm 包名
    localPath?: string;    // 本地路径(开发用)
    defaultChoice?: "npm" | "local";
  };
};

十四、完整数据流

14.1 入站消息处理流

用户发送消息 (Telegram/Discord/Slack/...)


Platform SDK 接收(grammY / discord.js / Slack Events API)


渠道 Monitor 预处理
  ├── 解析消息内容(文本/媒体/位置/表情)
  ├── 解析发送者信息(ID/名称/用户名)
  ├── 检测会话类型(DM/Group/Channel/Thread)
  └── 去重检查(Telegram Update ID / Discord Message ID)


路由解析 resolveAgentRoute()
  ├── 从配置加载 bindings
  ├── 构建索引 → 七层优先级匹配
  ├── 解析 agentId
  └── 构建 sessionKey


安全门控
  ├── ① AllowFrom:发送者是否在允许列表中
  ├── ② Mention:群组中是否被 @提及
  └── ③ Command:控制命令是否有权限执行
  │ 任一关卡未通过 → 丢弃/配对消息

入站上下文构建(MsgContext)
  ├── 防抖判定(shouldDebounceTextInbound)
  ├── 模型覆盖检查(resolveChannelModelOverride)
  └── 会话元数据记录(recordInboundSession)


创建状态反应控制器 → 设置 👀


Typing 指示器启动 → onReplyStart()


交给 Agent 消息处理器
  ├── 思考 → 🤔
  ├── 调用工具 → 🔥/👨‍💻/⚡
  ├── 流式生成 → DraftStreamLoop.update()
  └── 完成 → 👍 或错误 → 😱


出站发送
  ├── 文本消息 → chunker 分块 → sendText
  ├── 媒体消息 → sendMedia
  └── 富文本 → sendPayload(按钮/引用/多媒体)

14.2 出站消息发送流

Agent 回复内容


loadChannelOutboundAdapter(channelId)
  │ → 从注册表加载轻量适配器

resolveTarget()
  │ → 确定发送目标(to 字段标准化)

内容处理
  ├── 文本分块:chunker(text, textChunkLimit)
  │   ├── Telegram: markdownToTelegramHtmlChunks (4000 字符)
  │   ├── Discord: markdown chunks (2000 字符)
  │   └── IRC: plain text (350 字符)
  ├── 媒体处理:解析 mediaUrl / mediaLocalRoots
  └── 渠道特化数据:buttons / quoteText / effects


发送(deliveryMode)
  ├── direct → 直接调用平台 API
  ├── gateway → 通过 WebSocket RPC 转发
  └── hybrid → 尝试直发,失败则网关


返回 OutboundDeliveryResult
  └── { channel, messageId, chatId, ... }

十五、设计模式总结

15.1 适配器模式(Adapter Pattern)

核心体现。18 种微适配器接口统一了渠道行为的抽象,每个渠道自选实现。

15.2 注册表模式(Registry Pattern)

PluginRegistry 管理所有已注册的渠道插件,支持版本化缓存、热重载、去重排序。

15.3 代理模式(Proxy Pattern)

ChannelDockChannelPlugin 的轻量代理,避免共享代码路径触发重量级导入。

15.4 策略模式(Strategy Pattern)

  • deliveryMode: "direct" | "gateway" | "hybrid" → 出站发送策略
  • dmScope: "main" | "per-peer" | ... → 会话键生成策略
  • dmPolicy: "open" | "allowlist" | ... → DM 访问策略

15.5 责任链模式(Chain of Responsibility)

七层路由优先级匹配 + 三道安全门控,都是责任链的典型应用。

15.6 工厂方法模式(Factory Method)

  • createDraftStreamLoop() → 流式输出循环
  • createStatusReactionController() → 状态反应控制器
  • createTypingCallbacks() → 输入状态指示器
  • createArmableStallWatchdog() → 传输看门狗
  • createChannelRegistryLoader() → 通用注册表加载器工厂

15.7 观察者模式(Observer Pattern)

  • Telegram Bot.on("message", ...) → 消息事件监听
  • Discord client.on("messageCreate", ...) → 消息事件监听
  • AbortSignal 事件监听 → 优雅停机

十六、调试建议

16.1 路由调试

bash
# 打开详细路由日志
OPENCLAW_LOG_VERBOSE=1 pnpm openclaw gateway run

路由引擎在 shouldLogVerbose() 为 true 时输出详细匹配过程:

[routing] resolveAgentRoute: channel=telegram accountId=default peer=direct:12345 bindings=3
[routing] binding: agentId=main accountPattern=default peer=none guildId=none teamId=none roles=0
[routing] match: matchedBy=binding.account agentId=main

16.2 关键断点位置

文件行号断点目的
src/routing/resolve-route.tschoose() 函数路由最终决策
src/channels/allow-from.tsisSenderIdAllowed()AllowFrom 判定
src/channels/mention-gating.tsresolveMentionGating()提及门控判定
src/channels/command-gating.tsresolveControlCommandGate()命令门控判定
src/channels/draft-stream-loop.tsflush() 函数流式输出时机
src/channels/status-reactions.tsapplyEmoji()状态 Emoji 切换
src/channels/plugins/index.tsresolveCachedChannelPlugins()插件加载
src/channels/dock.tsgetChannelDock()Dock 查找
src/telegram/bot-message-context.ts安全门控代码段Telegram 入站处理全链路

16.3 测试命令

bash
# Channel 相关测试
pnpm test -- --reporter verbose src/channels/

# 路由测试
pnpm test -- --reporter verbose src/routing/

# Telegram 测试
pnpm test -- --reporter verbose src/telegram/

# 特定文件
pnpm test -- --reporter verbose src/channels/command-gating.test.ts
pnpm test -- --reporter verbose src/routing/resolve-route.test.ts

十七、文件依赖图

src/channels/chat-type.ts                  ← 零依赖,基础类型

src/channels/registry.ts                   ← 元数据 + ID 标准化

src/channels/dock.ts                       ← 轻量代理(依赖各渠道 accounts)

src/channels/plugins/types.core.ts         ← 核心类型定义
src/channels/plugins/types.adapters.ts     ← 适配器接口
src/channels/plugins/types.plugin.ts       ← ChannelPlugin 协议

src/channels/plugins/index.ts             ← 运行时注册表

src/channels/plugins/message-actions.ts   ← 消息动作分发
src/channels/plugins/pairing.ts           ← 配对系统
src/channels/plugins/status.ts            ← 状态快照

src/routing/session-key.ts                ← 会话键构建(零渠道依赖)

src/routing/bindings.ts                   ← 路由绑定解析

src/routing/resolve-route.ts              ← 路由解析引擎(依赖 session-key + bindings)

src/channels/allow-from.ts               ← 安全门控(零渠道依赖)
src/channels/command-gating.ts            ← 安全门控(零渠道依赖)
src/channels/mention-gating.ts            ← 安全门控(零渠道依赖)

src/channels/draft-stream-loop.ts         ← 流式输出(零渠道依赖)

src/channels/draft-stream-controls.ts     ← 可终结流式控制

src/channels/status-reactions.ts          ← 状态反应(零渠道依赖)
src/channels/typing.ts                   ← 输入状态(零渠道依赖)
src/channels/transport/stall-watchdog.ts  ← 传输看门狗(零渠道依赖)

src/telegram/bot.ts                       ← 具体渠道实现
  ↑ 依赖上述所有共享模块
src/telegram/bot-message-context.ts       ← Telegram 入站上下文

十八、设计洞察

18.1 为什么分三层(Registry/Dock/Plugin)

这是对 导入开销功能完整性 的权衡。JavaScript/TypeScript 的 ESM 模块在 import 时会执行所有顶层代码。一个 Telegram Plugin 可能导入 grammY SDK(数百 KB),一个 WhatsApp Plugin 可能导入 Puppeteer。如果共享代码(如命令权限检查)需要查询某个渠道的能力,直接导入 Plugin 就会拉入不需要的重量级依赖。

Dock 层的存在正是为了解决这个问题——它是 Plugin 的静态投影,只包含可以安全内联的轻量属性(能力声明、配置读取器、提及正则等),不包含任何运行时 SDK 导入。

18.2 安全门控为什么是三道而不是一道

单一的 AllowFrom 检查不够,因为:

  • AllowFrom 管的是"谁能跟 Bot 说话"→ 身份层
  • Mention 管的是"群组中何时响应"→ 触发层
  • Command 管的是"谁能执行控制命令"→ 权限层

三层关注点分离,且每层有独立的配置旋钮。关键设计:控制命令可以绕过 Mention 门控(用户在群组中发 /reset 不需要 @Bot),但仍需通过 Command 门控验证权限。

18.3 七层路由优先级的设计哲学

从最精确到最宽泛的匹配,遵循"最小惊讶原则":

  • 如果为某个用户/群组配置了特定 Agent,那一定要用那个 Agent
  • 如果只在 Guild/Team 级别配了,那同 Guild/Team 的都用同一个 Agent
  • 如果在账户级别配了,该账户所有消息用同一个 Agent
  • 什么都没配?用默认 Agent

索引结构确保即使有上千条 binding 也能快速匹配。

18.4 流式输出的节流设计

DraftStreamLoop 的节流不是简单的 throttle(fn, ms),而是自适应节流

  • 如果上次发送已过了 throttleMs → 立即发送(低延迟响应开始)
  • 如果有飞行中的请求 → 调度延迟发送(防止并发冲突)
  • 如果是新的更新 → 调度延迟发送(合并快速连续更新)

这保证了第一个流式 token 能立即显示,后续 token 合理合并,不会产生过多的 API 调用。

读文档、看源码、写代码,理解 AI Agent 本质 🤖