主题
OpenClaw 源码解读(四)Channel 渠道系统
本文基于 OpenClaw 2026.3.2 源码,深入解读整个 Channel 渠道系统的架构设计与实现细节。
一、模块概览
Channel 渠道系统是 OpenClaw 最庞大、最复杂的模块,它负责对接所有外部消息平台(Telegram、Discord、Slack、WhatsApp、Signal、iMessage、IRC、Google Chat 等),并提供统一的抽象接口。这套系统采用 插件化架构,内置渠道和第三方扩展渠道遵循相同的 ChannelPlugin 协议。
1.1 核心源码分布
| 目录/文件 | 行数(约) | 职责 |
|---|---|---|
src/channels/registry.ts | ~189 | 渠道元数据注册表、ID 标准化、别名映射 |
src/channels/dock.ts | ~629 | 轻量级渠道行为代理(Dock),供共享代码使用 |
src/channels/plugins/types.core.ts | ~370 | 核心类型定义(30+ 类型) |
src/channels/plugins/types.adapters.ts | ~382 | 适配器接口定义(18 种适配器) |
src/channels/plugins/types.plugin.ts | ~85 | ChannelPlugin 完整协议 |
src/channels/plugins/index.ts | ~117 | 插件运行时注册表、缓存与查询 |
src/channels/plugins/catalog.ts | ~307 | 插件目录发现与安装信息 |
src/channels/plugins/registry-loader.ts | ~35 | 通用注册表加载器工厂 |
src/channels/plugins/load.ts | ~8 | 按 ID 加载插件的快捷入口 |
src/channels/plugins/message-actions.ts | ~103 | 跨渠道消息动作分发 |
src/channels/plugins/pairing.ts | ~69 | 渠道配对(allowlist 审批) |
src/channels/plugins/status.ts | ~36 | 渠道账户状态快照构建 |
src/channels/allow-from.ts | ~53 | DM/Group allowFrom 合并逻辑 |
src/channels/command-gating.ts | ~45 | 控制命令权限门控 |
src/channels/mention-gating.ts | ~59 | @提及门控(群组消息过滤) |
src/channels/session.ts | ~81 | 入站会话记录 |
src/channels/chat-type.ts | ~18 | 会话类型定义(direct/group/channel) |
src/channels/targets.ts | ~146 | 消息目标解析与构建 |
src/channels/model-overrides.ts | ~142 | 按渠道/群组的模型覆盖 |
src/channels/status-reactions.ts | ~383 | 状态反应控制器(emoji 状态指示) |
src/channels/draft-stream-loop.ts | ~104 | 流式草稿输出循环 |
src/channels/draft-stream-controls.ts | ~142 | 可终结的流式草稿控制器 |
src/channels/typing.ts | ~99 | 输入状态指示器 |
src/channels/inbound-debounce-policy.ts | ~51 | 入站消息防抖策略 |
src/channels/transport/stall-watchdog.ts | ~103 | 传输层停滞看门狗 |
src/channels/account-summary.ts | ~70 | 渠道账户摘要构建 |
src/routing/resolve-route.ts | ~719 | Agent 路由解析引擎 |
src/routing/session-key.ts | ~246 | 会话 Key 构建与解析 |
src/routing/bindings.ts | ~113 | 路由绑定配置解析 |
src/routing/account-lookup.ts | ~14 | 账户大小写不敏感查找 |
src/telegram/bot.ts | ~350+ | Telegram Bot 创建(具体渠道实现示例) |
src/telegram/bot-message-context.ts | ~500+ | Telegram 入站消息上下文构建 |
src/discord/monitor/provider.ts | ~500+ | Discord 监听器提供者 |
src/slack/monitor/provider.ts | ~400+ | Slack Socket Mode 提供者 |
1.2 系统全景
┌─────────────────────────────────────────────────────────────────┐
│ Channel 渠道系统全景 │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Plugin Registry(插件注册表) │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Telegram │ │ Discord │ │ Slack │ │ WhatsApp │ │ │
│ │ │ Plugin │ │ Plugin │ │ Plugin │ │ Plugin │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │ Signal │ │ iMessage │ │ IRC │ │ GChat │ │ │
│ │ │ Plugin │ │ Plugin │ │ Plugin │ │ Plugin │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ ┌──────────┐ ┌──────────┐ ...扩展渠道 │ │
│ │ │ MS Teams │ │ Matrix │ │ │
│ │ │(extension)│ │(extension)│ │ │
│ │ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Dock(轻量代理)│ │ Outbound │ │ Message │ │
│ │ capabilities │ │ (出站发送) │ │ Actions │ │
│ │ threading │ │ sendText │ │ (消息动作) │ │
│ │ mentions │ │ sendMedia │ │ send/react/ │ │
│ │ groups │ │ sendPayload │ │ kick/ban │ │
│ └────────────────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 共享安全与路由层 │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │AllowFrom │ │ Command │ │ Mention │ │ Route │ │ │
│ │ │ Gating │ │ Gating │ │ Gating │ │ Resolver │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌────────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Session 管理 │ │ Draft Stream│ │ Status │ │
│ │ 会话键构建 │ │ 流式输出 │ │ Reactions │ │
│ │ 持久化记录 │ │ 节流控制 │ │ Emoji 状态 │ │
│ └────────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘二、ChannelPlugin 协议——渠道的"合同"
所有渠道(无论内置还是扩展)都必须实现 ChannelPlugin 接口。这是整个渠道系统的核心协议。
2.1 协议定义
typescript
// src/channels/plugins/types.plugin.ts
export type ChannelPlugin<ResolvedAccount = any, Probe = unknown, Audit = unknown> = {
id: ChannelId; // 唯一标识符,如 "telegram"
meta: ChannelMeta; // UI 元数据(标签、图标、文档路径)
capabilities: ChannelCapabilities; // 能力声明(支持哪些功能)
// 可选适配器(共 18 个接口槽位)
config: ChannelConfigAdapter<ResolvedAccount>; // 必须:账户配置读取
setup?: ChannelSetupAdapter; // 账户设置向导
pairing?: ChannelPairingAdapter; // 配对/审批
security?: ChannelSecurityAdapter<ResolvedAccount>; // DM 安全策略
groups?: ChannelGroupAdapter; // 群组策略
mentions?: ChannelMentionAdapter; // @提及处理
outbound?: ChannelOutboundAdapter; // 出站消息发送
status?: ChannelStatusAdapter<...>; // 状态探测/审计
gateway?: ChannelGatewayAdapter<...>; // 网关集成(启停账户)
auth?: ChannelAuthAdapter; // 登录认证
elevated?: ChannelElevatedAdapter; // 特权降级
commands?: ChannelCommandAdapter; // 命令控制
streaming?: ChannelStreamingAdapter; // 流式输出配置
threading?: ChannelThreadingAdapter; // 线程/回复模式
messaging?: ChannelMessagingAdapter; // 消息目标解析
agentPrompt?: ChannelAgentPromptAdapter; // Agent 提示词注入
directory?: ChannelDirectoryAdapter; // 联系人目录
resolver?: ChannelResolverAdapter; // 目标解析器
actions?: ChannelMessageActionAdapter; // 消息动作处理
heartbeat?: ChannelHeartbeatAdapter; // 心跳检测
onboarding?: ChannelOnboardingAdapter; // CLI 引导向导
agentTools?: ChannelAgentToolFactory | ChannelAgentTool[]; // 渠道专属 Agent 工具
// 配置热重载
reload?: { configPrefixes: string[]; noopPrefixes?: string[] };
configSchema?: ChannelConfigSchema;
defaults?: { queue?: { debounceMs?: number } };
gatewayMethods?: string[];
};2.2 能力声明
每个渠道通过 capabilities 明确声明自己支持什么:
typescript
export type ChannelCapabilities = {
chatTypes: Array<ChatType | "thread">; // 支持的会话类型
polls?: boolean; // 投票
reactions?: boolean; // 表情反应
edit?: boolean; // 消息编辑
unsend?: boolean; // 撤回消息
reply?: boolean; // 回复消息
effects?: boolean; // 消息效果
groupManagement?: boolean; // 群组管理
threads?: boolean; // 线程
media?: boolean; // 媒体消息
nativeCommands?: boolean; // 原生命令菜单
blockStreaming?: boolean; // 分块流式输出
};这套能力声明机制让上层代码无需关心具体渠道,只需检查 capabilities.polls 即可知道是否支持投票功能。
2.3 18 种适配器接口
渠道系统采用 微适配器 设计——将渠道行为拆分为 18 个独立的适配器接口,每个渠道只需实现自己支持的部分:
| 适配器 | 必须? | 职责 |
|---|---|---|
config | ✅ | 账户列表、解析、启用状态、allowFrom |
setup | ❌ | 账户设置向导(applyAccountConfig) |
pairing | ❌ | 设备配对、allowlist 审批通知 |
security | ❌ | DM 策略解析、安全警告 |
groups | ❌ | 群组 @提及要求、工具策略 |
mentions | ❌ | @提及文本清理正则 |
outbound | ❌ | 出站消息发送(文本/媒体/投票) |
status | ❌ | 健康探测、审计、运行时快照 |
gateway | ❌ | 启动/停止账户、QR 登录、登出 |
auth | ❌ | CLI 登录流程 |
elevated | ❌ | 特权降级 allowFrom 回退 |
commands | ❌ | 命令权限控制标志 |
streaming | ❌ | 分块流式输出参数 |
threading | ❌ | 回复模式、线程上下文构建 |
messaging | ❌ | 消息目标标准化 |
agentPrompt | ❌ | Agent 提示词渠道特化 |
directory | ❌ | 联系人/群组目录查询 |
resolver | ❌ | 目标名称→ID 解析 |
actions | ❌ | 渠道消息动作(send/react/kick/ban) |
heartbeat | ❌ | 心跳就绪检查 |
三、三层架构:Registry → Dock → Plugin
渠道系统采用精心设计的三层架构,解决"重量级插件"的导入成本问题。
3.1 Registry(注册表)——元数据层
typescript
// src/channels/registry.ts
export const CHAT_CHANNEL_ORDER = [
"telegram", "whatsapp", "discord", "irc",
"googlechat", "slack", "signal", "imessage",
] as const;
const CHAT_CHANNEL_META: Record<ChatChannelId, ChannelMeta> = {
telegram: {
id: "telegram",
label: "Telegram",
selectionLabel: "Telegram (Bot API)",
docsPath: "/channels/telegram",
blurb: "simplest way to get started...",
systemImage: "paperplane",
},
// ... 其余 7 个渠道
};
export const CHAT_CHANNEL_ALIASES: Record<string, ChatChannelId> = {
imsg: "imessage",
"internet-relay-chat": "irc",
"google-chat": "googlechat",
gchat: "googlechat",
};Registry 层的特点:
- 零运行时依赖:不导入任何渠道实现代码
- 静态数据:元数据在编译时确定
- 别名映射:
imsg→imessage,gchat→googlechat - 标准化入口:
normalizeChatChannelId()→ 先查别名再匹配列表
3.2 Dock(轻量代理)——行为摘要层
Dock 是渠道系统最精妙的设计之一。它是 Plugin 的轻量级投影,只包含共享代码路径需要的行为片段:
typescript
// src/channels/dock.ts
export type ChannelDock = {
id: ChannelId;
capabilities: ChannelCapabilities;
commands?: ChannelCommandAdapter;
outbound?: { textChunkLimit?: number };
streaming?: ChannelDockStreaming;
elevated?: ChannelElevatedAdapter;
config?: Pick<ChannelConfigAdapter<unknown>,
"resolveAllowFrom" | "formatAllowFrom" | "resolveDefaultTo">;
groups?: ChannelGroupAdapter;
mentions?: ChannelMentionAdapter;
threading?: ChannelThreadingAdapter;
agentPrompt?: ChannelAgentPromptAdapter;
};为什么需要 Dock?
Plugin(重量级) Dock(轻量级)
───────────────── ─────────────────
导入 grammy Bot SDK ✗ 不导入
导入 Puppeteer ✗ 不导入
导入 HTTP 服务器 ✗ 不导入
导入 WebSocket 客户端 ✗ 不导入
capabilities ✓ capabilities ✓
threading ✓ threading ✓
mentions ✓ mentions ✓
allowFrom 解析 ✓ allowFrom 解析 ✓内置渠道的 Dock 是手写的静态对象,直接内联在 dock.ts 中。扩展渠道的 Dock 则从 Plugin 自动构建:
typescript
function buildDockFromPlugin(plugin: ChannelPlugin): ChannelDock {
return {
id: plugin.id,
capabilities: plugin.capabilities,
commands: plugin.commands,
outbound: plugin.outbound?.textChunkLimit
? { textChunkLimit: plugin.outbound.textChunkLimit }
: undefined,
streaming: plugin.streaming ? { ... } : undefined,
elevated: plugin.elevated,
config: plugin.config ? { ... } : undefined,
groups: plugin.groups,
mentions: plugin.mentions,
threading: plugin.threading,
agentPrompt: plugin.agentPrompt,
};
}3.3 Plugin(完整实现)——执行层
Plugin 是渠道的完整运行时实现,由插件注册表管理。只在真正需要时才加载:
typescript
// src/channels/plugins/index.ts
function resolveCachedChannelPlugins(): CachedChannelPlugins {
const registry = requireActivePluginRegistry();
const registryVersion = getActivePluginRegistryVersion();
// 版本化缓存:注册表变化时自动失效
if (cached.registryVersion === registryVersion) return cached;
// 去重 + 排序(内置渠道按 CHAT_CHANNEL_ORDER,扩展渠道按 order 或字母序)
const sorted = dedupeChannels(registry.channels.map(e => e.plugin))
.toSorted((a, b) => {
const indexA = CHAT_CHANNEL_ORDER.indexOf(a.id);
const orderA = a.meta.order ?? (indexA === -1 ? 999 : indexA);
// ...
});
// 构建 byId Map 加速查找
const byId = new Map<string, ChannelPlugin>();
for (const plugin of sorted) byId.set(plugin.id, plugin);
return { registryVersion, sorted, byId };
}三层架构的导入开销对比:
| 操作 | Registry | Dock | Plugin |
|---|---|---|---|
normalizeChatChannelId("tg") | 0 依赖 | — | — |
getChannelDock("telegram").capabilities | — | 轻量依赖 | — |
getChannelPlugin("telegram").gateway.startAccount() | — | — | 完整 SDK |
四、路由系统——消息如何找到正确的 Agent
路由系统是渠道系统的"大脑",它决定每条入站消息应该由哪个 Agent 处理,以及使用哪个 Session。
4.1 路由解析引擎
typescript
// src/routing/resolve-route.ts
export function resolveAgentRoute(input: ResolveAgentRouteInput): ResolvedAgentRoute输入:
typescript
type ResolveAgentRouteInput = {
cfg: OpenClawConfig;
channel: string; // "telegram"
accountId?: string | null; // "bot1"
peer?: RoutePeer | null; // { kind: "group", id: "123456" }
parentPeer?: RoutePeer | null; // 线程的父级
guildId?: string | null; // Discord guild ID
teamId?: string | null; // Slack team ID
memberRoleIds?: string[]; // Discord 角色 ID 列表
};输出:
typescript
type ResolvedAgentRoute = {
agentId: string; // "main" 或具体 agent ID
channel: string; // 标准化后的渠道名
accountId: string; // 标准化后的账户 ID
sessionKey: string; // 完整会话键
mainSessionKey: string; // 主会话键(用于 DM 折叠)
matchedBy: string; // 匹配方式(调试用)
};4.2 七层优先级匹配
路由引擎按照七层优先级逐层匹配,命中第一层后立即返回:
优先级从高到低:
Tier 1: binding.peer ← 精确匹配 Peer(用户/群组 ID)
Tier 2: binding.peer.parent ← 匹配线程的父级 Peer
Tier 3: binding.guild+roles ← Discord Guild + 角色组合
Tier 4: binding.guild ← Discord Guild 级别
Tier 5: binding.team ← Slack Team 级别
Tier 6: binding.account ← 账户级别
Tier 7: binding.channel ← 渠道级别(通配符 accountId="*")
Default: 使用默认 Agent源码中用 tiers 数组实现了这个优先级链:
typescript
const tiers = [
{
matchedBy: "binding.peer",
enabled: Boolean(peer),
candidates: collectPeerIndexedBindings(bindingsIndex, peer),
predicate: (c) => c.match.peer.state === "valid",
},
{
matchedBy: "binding.peer.parent",
enabled: Boolean(parentPeer && parentPeer.id),
candidates: collectPeerIndexedBindings(bindingsIndex, parentPeer),
// ...
},
// ... 其余 5 层
];
for (const tier of tiers) {
if (!tier.enabled) continue;
const matched = tier.candidates.find(c =>
tier.predicate(c) && matchesBindingScope(c.match, baseScope)
);
if (matched) return choose(matched.binding.agentId, tier.matchedBy);
}
return choose(resolveDefaultAgentId(cfg), "default");4.3 绑定索引加速
对于有大量 binding 配置的场景,每次路由解析都遍历全部 binding 会很慢。系统为此构建了索引结构:
typescript
type EvaluatedBindingsIndex = {
byPeer: Map<string, EvaluatedBinding[]>; // peer:id → bindings
byGuildWithRoles: Map<string, EvaluatedBinding[]>; // guildId → bindings(有角色约束)
byGuild: Map<string, EvaluatedBinding[]>; // guildId → bindings(无角色约束)
byTeam: Map<string, EvaluatedBinding[]>; // teamId → bindings
byAccount: EvaluatedBinding[]; // 账户级 bindings
byChannel: EvaluatedBinding[]; // 渠道级 bindings(通配符)
};索引通过 WeakMap<OpenClawConfig, Cache> 绑定到配置对象,配置不变时缓存命中,配置变化时自动失效。还有 LRU 大小限制:
typescript
const MAX_EVALUATED_BINDINGS_CACHE_KEYS = 2000;
const MAX_RESOLVED_ROUTE_CACHE_KEYS = 4000;4.4 会话键(Session Key)构建
会话键是 OpenClaw 消息持久化和并发控制的关键。格式:
agent:<agentId>:<rest>具体形式取决于 DM Scope 配置和会话类型:
| dmScope | 直聊键格式 | 说明 |
|---|---|---|
main(默认) | agent:main:main | 所有直聊共享一个会话 |
per-peer | agent:main:direct:alice | 每个对话者独立会话 |
per-channel-peer | agent:main:telegram:direct:alice | 每个渠道×对话者独立 |
per-account-channel-peer | agent:main:telegram:bot1:direct:alice | 最细粒度 |
群组/频道键格式固定:
agent:main:telegram:group:123456线程会话支持后缀扩展:
agent:main:slack:channel:C12345:thread:1700000000.0000004.5 身份链接(Identity Links)
跨渠道用户身份合并:
typescript
// 配置示例
session: {
identityLinks: {
alice: ["telegram:12345", "discord:67890", "alice@email.com"]
}
}当 dmScope 不是 main 时,buildAgentPeerSessionKey() 会调用 resolveLinkedPeerId() 将不同渠道的用户 ID 映射到同一个规范名(alice),使得 Telegram 和 Discord 的同一用户共享会话历史。
五、安全门控层——三道关卡
每条入站消息在到达 Agent 之前,必须通过三道安全门控。
5.1 AllowFrom 门控(谁能说话)
typescript
// src/channels/allow-from.ts
export function mergeDmAllowFromSources(params: {
allowFrom?: Array<string | number>; // 配置文件中的静态 allowlist
storeAllowFrom?: Array<string | number>; // 运行时 store 中的动态 allowlist
dmPolicy?: string;
}): string[] {
// allowlist 策略下只用配置文件的列表,不合并 store
const storeEntries = params.dmPolicy === "allowlist" ? [] : (params.storeAllowFrom ?? []);
return [...(params.allowFrom ?? []), ...storeEntries]
.map(v => String(v).trim())
.filter(Boolean);
}AllowFrom 检查的核心逻辑:
typescript
export function isSenderIdAllowed(
allow: { entries: string[]; hasWildcard: boolean; hasEntries: boolean },
senderId: string | undefined,
allowWhenEmpty: boolean, // 列表为空时的默认策略
): boolean {
if (!allow.hasEntries) return allowWhenEmpty; // 空列表→看默认策略
if (allow.hasWildcard) return true; // "*" 通配符→全部允许
if (!senderId) return false; // 无发送者 ID→拒绝
return allow.entries.includes(senderId); // 精确匹配
}5.2 Command 门控(谁能执行命令)
typescript
// src/channels/command-gating.ts
export function resolveControlCommandGate(params: {
useAccessGroups: boolean; // 是否启用 access groups
authorizers: CommandAuthorizer[]; // 各级权限判定器
allowTextCommands: boolean; // 是否允许文本命令
hasControlCommand: boolean; // 消息是否包含控制命令
}): { commandAuthorized: boolean; shouldBlock: boolean } {
const commandAuthorized = resolveCommandAuthorizedFromAuthorizers({ ... });
// 只有"允许文本命令 + 有控制命令 + 未授权"时才阻断
const shouldBlock = params.allowTextCommands
&& params.hasControlCommand
&& !commandAuthorized;
return { commandAuthorized, shouldBlock };
}三种工作模式(当 AccessGroups 关闭时):
allow(默认):全部放行deny:全部阻断configured:有任何 authorizer 配置了→检查权限;全未配置→放行
5.3 Mention 门控(群组中是否被 @)
群组消息默认需要 @提及 Bot 才会响应,防止 Bot 对每条群消息都回复:
typescript
// src/channels/mention-gating.ts
export function resolveMentionGating(params: MentionGateParams): MentionGateResult {
const implicit = params.implicitMention === true; // 隐式提及(如回复 Bot 消息)
const bypass = params.shouldBypassMention === true; // 绕过提及检查
const effectiveWasMentioned = params.wasMentioned || implicit || bypass;
const shouldSkip = params.requireMention
&& params.canDetectMention
&& !effectiveWasMentioned;
return { effectiveWasMentioned, shouldSkip };
}提及绕过机制:当用户在群组中发送控制命令(如 /reset)且有权限时,即使没有 @提及也不跳过:
typescript
export function resolveMentionGatingWithBypass(params): MentionGateWithBypassResult {
const shouldBypassMention =
params.isGroup &&
params.requireMention &&
!params.wasMentioned &&
!(params.hasAnyMention ?? false) &&
params.allowTextCommands &&
params.commandAuthorized &&
params.hasControlCommand; // 控制命令绕过提及门控
// ...
}5.4 三道关卡的执行顺序
以 Telegram 为例,在 bot-message-context.ts 中的完整执行顺序:
入站消息
│
▼
① AllowFrom 检查
├── DM: mergeDmAllowFromSources() → isSenderAllowed()
│ └── 未通过 → 配对消息或静默丢弃
└── Group: evaluateTelegramGroupBaseAccess()
└── 未通过 → logInboundDrop()
│
▼
② Mention 门控
├── hasBotMention() 检测是否被 @
├── implicitMention 检测(回复 Bot 消息)
└── resolveMentionGatingWithBypass()
└── shouldSkip → 静默忽略
│
▼
③ Command 门控
├── hasControlCommand() 检测控制命令
└── resolveControlCommandGate()
└── shouldBlock → 静默忽略
│
▼
通过 → 交给 Agent 处理六、出站消息发送
6.1 OutboundAdapter 接口
typescript
export type ChannelOutboundAdapter = {
deliveryMode: "direct" | "gateway" | "hybrid";
chunker?: (text: string, limit: number) => string[];
chunkerMode?: "text" | "markdown";
textChunkLimit?: number;
pollMaxOptions?: number;
resolveTarget?: (...) => { ok: true; to: string } | { ok: false; error: Error };
sendText?: (ctx: ChannelOutboundContext) => Promise<OutboundDeliveryResult>;
sendMedia?: (ctx: ChannelOutboundContext) => Promise<OutboundDeliveryResult>;
sendPayload?: (ctx: ChannelOutboundPayloadContext) => Promise<OutboundDeliveryResult>;
sendPoll?: (ctx: ChannelPollContext) => Promise<ChannelPollResult>;
};三种发送模式:
- direct:渠道直接调用平台 API(Telegram、Discord、Slack)
- gateway:通过 Gateway WebSocket 转发(扩展渠道常用)
- hybrid:直发 + 网关混合
6.2 加载优化
出站适配器有独立的轻量加载器,不会拉入完整的 Plugin:
typescript
// src/channels/plugins/outbound/load.ts
const loadOutboundAdapterFromRegistry = createChannelRegistryLoader<ChannelOutboundAdapter>(
(entry) => entry.plugin.outbound,
);6.3 Telegram 出站示例
typescript
// src/channels/plugins/outbound/telegram.ts
export const telegramOutbound: ChannelOutboundAdapter = {
deliveryMode: "direct",
chunker: markdownToTelegramHtmlChunks, // Markdown → Telegram HTML 分块
chunkerMode: "markdown",
textChunkLimit: 4000, // Telegram 消息长度限制
sendText: async ({ to, text, accountId, deps, replyToId, threadId }) => {
const { send, baseOpts } = resolveTelegramSendContext({ deps, accountId, replyToId, threadId });
const result = await send(to, text, { ...baseOpts });
return { channel: "telegram", ...result };
},
sendMedia: async ({ to, text, mediaUrl, mediaLocalRoots, ... }) => {
const { send, baseOpts } = resolveTelegramSendContext({ ... });
const result = await send(to, text, { ...baseOpts, mediaUrl, mediaLocalRoots });
return { channel: "telegram", ...result };
},
sendPayload: async ({ to, payload, ... }) => {
// 支持 inline buttons、引用文本、多媒体
// 多张媒体时循环发送,第一张附带 buttons
for (let i = 0; i < mediaUrls.length; i++) {
finalResult = await send(to, i === 0 ? text : "", {
...payloadOpts,
mediaUrl: mediaUrls[i],
...(i === 0 ? { buttons: telegramData?.buttons } : {}),
});
}
},
};七、流式输出——Draft Stream
AI 回复通常是流式生成的,Draft Stream 机制让用户能看到实时的打字效果。
7.1 核心循环
typescript
// src/channels/draft-stream-loop.ts
export function createDraftStreamLoop(params: {
throttleMs: number;
isStopped: () => boolean;
sendOrEditStreamMessage: (text: string) => Promise<void | boolean>;
}): DraftStreamLoop {
let lastSentAt = 0;
let pendingText = "";
let inFlightPromise: Promise<void | boolean> | undefined;
let timer: ReturnType<typeof setTimeout> | undefined;
return {
update: (text: string) => {
if (params.isStopped()) return;
pendingText = text;
// 如果有请求在飞行中 → 调度延迟发送
if (inFlightPromise) { schedule(); return; }
// 如果节流窗口已过 → 立即发送
if (!timer && Date.now() - lastSentAt >= params.throttleMs) {
void flush(); return;
}
// 否则调度
schedule();
},
flush, // 强制发送当前 pendingText
stop, // 停止并清空
resetPending, // 清空待发文本
resetThrottleWindow, // 重置节流窗口
waitForInFlight, // 等待飞行中的请求完成
};
}7.2 可终结的流式控制器
typescript
// src/channels/draft-stream-controls.ts
export function createFinalizableDraftStreamControls(params: {
throttleMs: number;
isStopped: () => boolean;
isFinal: () => boolean; // 是否已进入最终状态
markStopped: () => void;
markFinal: () => void;
sendOrEditStreamMessage: (text: string) => Promise<boolean>;
}) {
const loop = createDraftStreamLoop({ ... });
return {
update: (text: string) => {
if (params.isStopped() || params.isFinal()) return;
loop.update(text);
},
stop: async () => { // 正常完成:标记 final → flush 最后内容
params.markFinal();
await loop.flush();
},
stopForClear: async () => { // 异常中断:标记 stopped → 停止循环 → 等待飞行请求
params.markStopped();
loop.stop();
await loop.waitForInFlight();
},
};
}状态机:
STREAMING ──update()──→ STREAMING
│ │
│ stop() │ stopForClear()
▼ ▼
FINAL STOPPED
(flush残余) (立即停止)
│ │
▼ ▼
DONE DONE八、状态反应控制器——Emoji 进度条
Agent 处理消息时,通过 Emoji 反应向用户展示实时状态。
8.1 状态 Emoji 映射
typescript
// src/channels/status-reactions.ts
export const DEFAULT_EMOJIS = {
queued: "👀", // 排队中
thinking: "🤔", // 思考中
tool: "🔥", // 使用工具
coding: "👨💻", // 执行代码
web: "⚡", // 网络搜索
done: "👍", // 完成
error: "😱", // 出错
stallSoft: "🥱", // 轻微停滞(10s)
stallHard: "😨", // 严重停滞(30s)
};工具类型自动识别:
typescript
export function resolveToolEmoji(toolName: string | undefined, emojis): string {
const normalized = toolName?.trim().toLowerCase() ?? "";
if (WEB_TOOL_TOKENS.some(t => normalized.includes(t))) return emojis.web; // 🌐
if (CODING_TOOL_TOKENS.some(t => normalized.includes(t))) return emojis.coding; // 💻
return emojis.tool; // 🔥
}8.2 控制器设计
typescript
export function createStatusReactionController(params: {
enabled: boolean;
adapter: StatusReactionAdapter; // 平台特定的 set/remove 实现
initialEmoji: string;
emojis?: StatusReactionEmojis;
timing?: StatusReactionTiming;
}): StatusReactionController核心机制:
- Promise 链序列化:所有 API 调用通过
chainPromise串行执行,防止竞态 - 防抖:中间状态(thinking/tool)有 700ms 防抖,终态(done/error)立即执行
- 停滞检测:10s 无活动 → 🥱;30s → 😨
- 终态保护:done/error 之后忽略所有后续更新
typescript
function scheduleEmoji(emoji: string, options = {}) {
if (!enabled || finished) return;
if (emoji === currentEmoji || emoji === pendingEmoji) {
if (!options.skipStallReset) resetStallTimers();
return; // 去重
}
pendingEmoji = emoji;
clearDebounceTimer();
if (options.immediate) {
void enqueue(async () => { await applyEmoji(emoji); });
} else {
debounceTimer = setTimeout(() => {
void enqueue(async () => { await applyEmoji(emoji); });
}, timing.debounceMs);
}
if (!options.skipStallReset) resetStallTimers();
}九、入站消息处理——以 Telegram 为例
9.1 创建 Bot
typescript
// src/telegram/bot.ts
export function createTelegramBot(opts: TelegramBotOptions) {
const bot = new Bot(opts.token, client ? { client } : undefined);
bot.api.config.use(apiThrottler()); // API 节流
bot.catch((err) => { ... }); // 全局错误处理
const recentUpdates = createTelegramUpdateDedupe(); // 更新去重
// ... 注册消息处理器、原生命令等
}关键组件:
- grammY:Telegram Bot SDK
- apiThrottler:API 调用频率限制
- sequentialize:按 chat ID 序列化更新处理
- updateDedupe:Update ID 去重(防止轮询+Webhook 并行时重复处理)
9.2 消息上下文构建
bot-message-context.ts 是 Telegram 入站消息处理的核心,它将 Telegram 原始消息转化为标准化的 MsgContext:
Telegram Update
│
▼
解析消息内容(文本/媒体/位置/贴纸/转发)
│
▼
构建 sender 信息(ID/名称/用户名)
│
▼
路由解析 resolveAgentRoute()
│ → agentId, sessionKey, accountId
▼
安全门控(AllowFrom → Mention → Command)
│
▼
构建 MsgContext(标准化消息上下文)
│
▼
会话记录 recordInboundSession()
│
▼
创建 StatusReactionController
│
▼
交给 Agent 消息处理器9.3 防抖策略
快速连续消息(如用户连发多条)会被防抖合并:
typescript
// src/channels/inbound-debounce-policy.ts
export function shouldDebounceTextInbound(params: {
text: string | null | undefined;
cfg: OpenClawConfig;
hasMedia?: boolean;
allowDebounce?: boolean;
}): boolean {
if (params.allowDebounce === false) return false;
if (params.hasMedia) return false; // 媒体消息不防抖
const text = params.text?.trim() ?? "";
if (!text) return false;
return !hasControlCommand(text, params.cfg); // 控制命令不防抖
}防抖规则:
- 纯文本消息 → 防抖(等待后续消息合并)
- 媒体消息 → 不防抖(立即处理)
- 控制命令 → 不防抖(立即执行)
- 配置了
allowDebounce: false→ 不防抖
十、Typing 指示器——输入状态管理
10.1 TTL 安全机制
typescript
// src/channels/typing.ts
export function createTypingCallbacks(params): TypingCallbacks {
const maxDurationMs = params.maxDurationMs ?? 60_000; // 60s TTL 安全上限
const startGuard = createTypingStartGuard({
maxConsecutiveFailures, // 连续失败 2 次后熔断
onTrip: () => keepaliveLoop.stop(),
});
const keepaliveLoop = createTypingKeepaliveLoop({
intervalMs: keepaliveIntervalMs, // 默认 3s
onTick: fireStart,
});
return {
onReplyStart: async () => {
startGuard.reset();
keepaliveLoop.stop();
await fireStart(); // 发送第一个 typing 指示
keepaliveLoop.start(); // 启动 keepalive
startTtlTimer(); // 启动 TTL 安全计时器
},
onIdle: fireStop, // 空闲时停止
onCleanup: fireStop, // 清理时停止
};
}三层保护:
- Keepalive 循环:每 3s 重新发送 typing 指示(平台通常 5s 后消失)
- 熔断器:连续 2 次 start 失败 → 停止 keepalive
- TTL 安全:最多 60s 后自动停止(防止忘记清理)
十一、传输层看门狗
typescript
// src/channels/transport/stall-watchdog.ts
export function createArmableStallWatchdog(params: {
label: string;
timeoutMs: number;
checkIntervalMs?: number;
abortSignal?: AbortSignal;
onTimeout: (meta) => void;
}): ArmableStallWatchdog看门狗在以下场景使用:
- WebSocket 连接的 ping/pong 超时检测
- 轮询请求的响应超时检测
- 长运行操作的活跃度检测
typescript
return {
arm: (atMs?) => { armed = true; lastActivityAt = atMs ?? Date.now(); },
touch: (atMs?) => { lastActivityAt = atMs ?? Date.now(); }, // 喂狗
disarm: () => { armed = false; },
stop: () => { stopped = true; clearInterval(timer); },
isArmed: () => armed,
};十二、模型覆盖——按渠道/群组切换模型
typescript
// src/channels/model-overrides.ts
export function resolveChannelModelOverride(params): ChannelModelOverride | null配置示例:
json5
{
channels: {
modelByChannel: {
telegram: {
"*": "claude-3.5-sonnet", // 默认
"group:123456": "gpt-4o", // 特定群组
"#general": "claude-3-haiku", // 按频道名
},
discord: {
"*": "gpt-4o-mini",
}
}
}
}查找逻辑使用 buildChannelKeyCandidates() 生成候选键列表:
groupId (精确 ID) > parentGroupId > groupChannel (#名称)
> channelBare (去 #) > channelSlug (标准化) > "*" (通配符)十三、渠道目录系统(Catalog)
Catalog 解决"如何发现可安装的渠道插件"的问题。
13.1 四种来源优先级
typescript
const ORIGIN_PRIORITY = {
config: 0, // 配置文件中指定的插件
workspace: 1, // 工作区中的本地插件
global: 2, // 全局安装的插件
bundled: 3, // 内置插件
};13.2 外部目录
除了代码中的插件,还支持从外部 JSON 文件加载目录:
typescript
const DEFAULT_CATALOG_PATHS = [
path.join(CONFIG_DIR, "mpm", "plugins.json"),
path.join(CONFIG_DIR, "mpm", "catalog.json"),
path.join(CONFIG_DIR, "plugins", "catalog.json"),
];每个 catalog entry 包含安装信息:
typescript
type ChannelPluginCatalogEntry = {
id: string;
meta: ChannelMeta;
install: {
npmSpec: string; // npm 包名
localPath?: string; // 本地路径(开发用)
defaultChoice?: "npm" | "local";
};
};十四、完整数据流
14.1 入站消息处理流
用户发送消息 (Telegram/Discord/Slack/...)
│
▼
Platform SDK 接收(grammY / discord.js / Slack Events API)
│
▼
渠道 Monitor 预处理
├── 解析消息内容(文本/媒体/位置/表情)
├── 解析发送者信息(ID/名称/用户名)
├── 检测会话类型(DM/Group/Channel/Thread)
└── 去重检查(Telegram Update ID / Discord Message ID)
│
▼
路由解析 resolveAgentRoute()
├── 从配置加载 bindings
├── 构建索引 → 七层优先级匹配
├── 解析 agentId
└── 构建 sessionKey
│
▼
安全门控
├── ① AllowFrom:发送者是否在允许列表中
├── ② Mention:群组中是否被 @提及
└── ③ Command:控制命令是否有权限执行
│ 任一关卡未通过 → 丢弃/配对消息
▼
入站上下文构建(MsgContext)
├── 防抖判定(shouldDebounceTextInbound)
├── 模型覆盖检查(resolveChannelModelOverride)
└── 会话元数据记录(recordInboundSession)
│
▼
创建状态反应控制器 → 设置 👀
│
▼
Typing 指示器启动 → onReplyStart()
│
▼
交给 Agent 消息处理器
├── 思考 → 🤔
├── 调用工具 → 🔥/👨💻/⚡
├── 流式生成 → DraftStreamLoop.update()
└── 完成 → 👍 或错误 → 😱
│
▼
出站发送
├── 文本消息 → chunker 分块 → sendText
├── 媒体消息 → sendMedia
└── 富文本 → sendPayload(按钮/引用/多媒体)14.2 出站消息发送流
Agent 回复内容
│
▼
loadChannelOutboundAdapter(channelId)
│ → 从注册表加载轻量适配器
▼
resolveTarget()
│ → 确定发送目标(to 字段标准化)
▼
内容处理
├── 文本分块:chunker(text, textChunkLimit)
│ ├── Telegram: markdownToTelegramHtmlChunks (4000 字符)
│ ├── Discord: markdown chunks (2000 字符)
│ └── IRC: plain text (350 字符)
├── 媒体处理:解析 mediaUrl / mediaLocalRoots
└── 渠道特化数据:buttons / quoteText / effects
│
▼
发送(deliveryMode)
├── direct → 直接调用平台 API
├── gateway → 通过 WebSocket RPC 转发
└── hybrid → 尝试直发,失败则网关
│
▼
返回 OutboundDeliveryResult
└── { channel, messageId, chatId, ... }十五、设计模式总结
15.1 适配器模式(Adapter Pattern)
核心体现。18 种微适配器接口统一了渠道行为的抽象,每个渠道自选实现。
15.2 注册表模式(Registry Pattern)
PluginRegistry 管理所有已注册的渠道插件,支持版本化缓存、热重载、去重排序。
15.3 代理模式(Proxy Pattern)
ChannelDock 是 ChannelPlugin 的轻量代理,避免共享代码路径触发重量级导入。
15.4 策略模式(Strategy Pattern)
deliveryMode: "direct" | "gateway" | "hybrid"→ 出站发送策略dmScope: "main" | "per-peer" | ...→ 会话键生成策略dmPolicy: "open" | "allowlist" | ...→ DM 访问策略
15.5 责任链模式(Chain of Responsibility)
七层路由优先级匹配 + 三道安全门控,都是责任链的典型应用。
15.6 工厂方法模式(Factory Method)
createDraftStreamLoop()→ 流式输出循环createStatusReactionController()→ 状态反应控制器createTypingCallbacks()→ 输入状态指示器createArmableStallWatchdog()→ 传输看门狗createChannelRegistryLoader()→ 通用注册表加载器工厂
15.7 观察者模式(Observer Pattern)
- Telegram
Bot.on("message", ...)→ 消息事件监听 - Discord
client.on("messageCreate", ...)→ 消息事件监听 - AbortSignal 事件监听 → 优雅停机
十六、调试建议
16.1 路由调试
bash
# 打开详细路由日志
OPENCLAW_LOG_VERBOSE=1 pnpm openclaw gateway run路由引擎在 shouldLogVerbose() 为 true 时输出详细匹配过程:
[routing] resolveAgentRoute: channel=telegram accountId=default peer=direct:12345 bindings=3
[routing] binding: agentId=main accountPattern=default peer=none guildId=none teamId=none roles=0
[routing] match: matchedBy=binding.account agentId=main16.2 关键断点位置
| 文件 | 行号 | 断点目的 |
|---|---|---|
src/routing/resolve-route.ts | choose() 函数 | 路由最终决策 |
src/channels/allow-from.ts | isSenderIdAllowed() | AllowFrom 判定 |
src/channels/mention-gating.ts | resolveMentionGating() | 提及门控判定 |
src/channels/command-gating.ts | resolveControlCommandGate() | 命令门控判定 |
src/channels/draft-stream-loop.ts | flush() 函数 | 流式输出时机 |
src/channels/status-reactions.ts | applyEmoji() | 状态 Emoji 切换 |
src/channels/plugins/index.ts | resolveCachedChannelPlugins() | 插件加载 |
src/channels/dock.ts | getChannelDock() | Dock 查找 |
src/telegram/bot-message-context.ts | 安全门控代码段 | Telegram 入站处理全链路 |
16.3 测试命令
bash
# Channel 相关测试
pnpm test -- --reporter verbose src/channels/
# 路由测试
pnpm test -- --reporter verbose src/routing/
# Telegram 测试
pnpm test -- --reporter verbose src/telegram/
# 特定文件
pnpm test -- --reporter verbose src/channels/command-gating.test.ts
pnpm test -- --reporter verbose src/routing/resolve-route.test.ts十七、文件依赖图
src/channels/chat-type.ts ← 零依赖,基础类型
↑
src/channels/registry.ts ← 元数据 + ID 标准化
↑
src/channels/dock.ts ← 轻量代理(依赖各渠道 accounts)
↑
src/channels/plugins/types.core.ts ← 核心类型定义
src/channels/plugins/types.adapters.ts ← 适配器接口
src/channels/plugins/types.plugin.ts ← ChannelPlugin 协议
↑
src/channels/plugins/index.ts ← 运行时注册表
↑
src/channels/plugins/message-actions.ts ← 消息动作分发
src/channels/plugins/pairing.ts ← 配对系统
src/channels/plugins/status.ts ← 状态快照
src/routing/session-key.ts ← 会话键构建(零渠道依赖)
↑
src/routing/bindings.ts ← 路由绑定解析
↑
src/routing/resolve-route.ts ← 路由解析引擎(依赖 session-key + bindings)
src/channels/allow-from.ts ← 安全门控(零渠道依赖)
src/channels/command-gating.ts ← 安全门控(零渠道依赖)
src/channels/mention-gating.ts ← 安全门控(零渠道依赖)
src/channels/draft-stream-loop.ts ← 流式输出(零渠道依赖)
↑
src/channels/draft-stream-controls.ts ← 可终结流式控制
src/channels/status-reactions.ts ← 状态反应(零渠道依赖)
src/channels/typing.ts ← 输入状态(零渠道依赖)
src/channels/transport/stall-watchdog.ts ← 传输看门狗(零渠道依赖)
src/telegram/bot.ts ← 具体渠道实现
↑ 依赖上述所有共享模块
src/telegram/bot-message-context.ts ← Telegram 入站上下文十八、设计洞察
18.1 为什么分三层(Registry/Dock/Plugin)
这是对 导入开销 和 功能完整性 的权衡。JavaScript/TypeScript 的 ESM 模块在 import 时会执行所有顶层代码。一个 Telegram Plugin 可能导入 grammY SDK(数百 KB),一个 WhatsApp Plugin 可能导入 Puppeteer。如果共享代码(如命令权限检查)需要查询某个渠道的能力,直接导入 Plugin 就会拉入不需要的重量级依赖。
Dock 层的存在正是为了解决这个问题——它是 Plugin 的静态投影,只包含可以安全内联的轻量属性(能力声明、配置读取器、提及正则等),不包含任何运行时 SDK 导入。
18.2 安全门控为什么是三道而不是一道
单一的 AllowFrom 检查不够,因为:
- AllowFrom 管的是"谁能跟 Bot 说话"→ 身份层
- Mention 管的是"群组中何时响应"→ 触发层
- Command 管的是"谁能执行控制命令"→ 权限层
三层关注点分离,且每层有独立的配置旋钮。关键设计:控制命令可以绕过 Mention 门控(用户在群组中发 /reset 不需要 @Bot),但仍需通过 Command 门控验证权限。
18.3 七层路由优先级的设计哲学
从最精确到最宽泛的匹配,遵循"最小惊讶原则":
- 如果为某个用户/群组配置了特定 Agent,那一定要用那个 Agent
- 如果只在 Guild/Team 级别配了,那同 Guild/Team 的都用同一个 Agent
- 如果在账户级别配了,该账户所有消息用同一个 Agent
- 什么都没配?用默认 Agent
索引结构确保即使有上千条 binding 也能快速匹配。
18.4 流式输出的节流设计
DraftStreamLoop 的节流不是简单的 throttle(fn, ms),而是自适应节流:
- 如果上次发送已过了 throttleMs → 立即发送(低延迟响应开始)
- 如果有飞行中的请求 → 调度延迟发送(防止并发冲突)
- 如果是新的更新 → 调度延迟发送(合并快速连续更新)
这保证了第一个流式 token 能立即显示,后续 token 合理合并,不会产生过多的 API 调用。