主题
OpenClaw 源码解读(十)定时任务系统
本文基于 OpenClaw 2026.3.2 源码,深入解读定时任务系统(Cron System)的架构设计与实现细节。
一、模块概览
定时任务系统赋予 AI 助手时间感知能力:设置提醒、定期检查、定时报告……它让 AI 可以在未来的某个时间点主动执行任务,而不需要用户在线。
用户可以这样使用:
- "每天早上 9 点提醒我查看邮件"
- "30 分钟后提醒我开会"
- "每周一分析一下我的 GitHub issues 并发到 Slack"
1.1 核心源码分布
| 目录/文件 | 行数(约) | 职责 |
|---|---|---|
src/cron/service/state.ts | ~156 | 服务状态定义(依赖注入接口、状态类型) |
src/cron/service/timer.ts | ~1143 | 调度引擎核心(定时器、Job 执行、结果处理、失败告警) |
src/cron/service/jobs.ts | ~835 | Job 生命周期(创建、修改、调度计算、卡住检测) |
src/cron/service/ops.ts | ~330+ | CRUD 操作(add、update、remove、list、run、status) |
src/cron/service/store.ts | ~60+ | 运行时 Store 加载/持久化桥接 |
src/cron/service/locked.ts | ~30+ | 序列化锁(防止并发操作竞态) |
src/cron/schedule.ts | ~117 | 调度计算(croner 引擎、三种调度类型) |
src/cron/normalize.ts | ~504 | 输入归一化(兼容历史格式、默认值推导) |
src/cron/store.ts | ~113 | JSON 文件持久化(原子写、备份、去重) |
src/cron/types.ts | ~200+ | 类型定义(Job、Schedule、Payload、Delivery) |
src/cron/delivery.ts | ~301 | 投递计划(频道路由、Webhook、失败通知) |
src/cron/stagger.ts | ~47 | 错峰执行(哈希偏移量,防止多 Job 同时触发) |
src/cron/session-reaper.ts | ~161 | 会话清理器(过期 cron run 会话自动清理) |
src/cron/parse.ts | ~60+ | 时间解析(ISO 8601、自然语言时间) |
src/cron/webhook-url.ts | ~20+ | Webhook URL 校验与归一化 |
src/cron/heartbeat-policy.ts | ~50+ | 心跳投递策略(何时向主会话发送摘要) |
src/agents/tools/cron-tool.ts | ~526 | Agent 工具入口(7 种 action) |
1.2 系统全景架构
┌──────────────────────────────────────────────────────────────────────┐
│ 定时任务系统全景架构 │
│ │
│ ┌─── AI Agent ────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ cron-tool.ts │ │
│ │ ├── add "每天9点提醒我查看邮件" │ │
│ │ ├── list 列出所有定时任务 │ │
│ │ ├── update 修改任务 │ │
│ │ ├── remove 删除任务 │ │
│ │ ├── run 立即执行 │ │
│ │ ├── runs 查看执行历史 │ │
│ │ └── wake 唤醒助手 │ │
│ └──────────────┬───────────────────────────────────────────────────┘ │
│ │ Gateway RPC │
│ ▼ │
│ ┌─── Cron Service ────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌── 状态 ──────────────────────────────────────────────────┐ │ │
│ │ │ store: CronStoreFile │ timer: setTimeout │ │ │
│ │ │ running: boolean │ op: Promise<> │ │ │
│ │ └──────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌── 调度引擎 ──────────────────────────────────────────────┐ │ │
│ │ │ armTimer() → setTimeout(onTimer, delay) │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ onTimer() → findDueJobs() → runDueJob() × concurrency │ │ │
│ │ │ │ │ │ │
│ │ │ ├── sessionTarget="main" │ │ │
│ │ │ │ enqueueSystemEvent() → requestHeartbeatNow() │ │ │
│ │ │ │ │ │ │
│ │ │ └── sessionTarget="isolated" │ │ │
│ │ │ runIsolatedAgentJob() → 独立 Agent 会话 │ │ │
│ │ │ │ │ │ │
│ │ │ └── Delivery │ │ │
│ │ │ ├── announce → 发消息到聊天频道 │ │ │
│ │ │ ├── webhook → HTTP POST 到外部 URL │ │ │
│ │ │ └── none → 仅记录结果 │ │ │
│ │ └──────────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─── 持久化 ──────────────────────────────────────────────────────┐ │
│ │ ~/.openclaw/cron/jobs.json │ │
│ │ (JSON5 格式,原子写入 + .bak 备份) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘定时任务系统/01-infographic-cron-system-overview-1775150811675.png)
二、Job 数据模型
2.1 CronJob 核心结构
typescript
type CronJob = {
id: string; // UUID
name: string; // 任务名称(如 "Daily email check")
description?: string; // 可选描述
agentId?: string; // 关联的 Agent(多 Agent 场景)
sessionKey?: string; // 会话命名空间(用于频道/线程路由)
enabled: boolean; // 是否启用
deleteAfterRun?: boolean; // 一次性任务执行后自动删除
createdAtMs: number; // 创建时间戳
updatedAtMs: number; // 最后更新时间戳
schedule: CronSchedule; // 调度配置(三种类型)
sessionTarget: "main" | "isolated"; // 执行模式
wakeMode?: "now" | "next-heartbeat"; // 唤醒方式
payload: CronPayload; // 负载(要执行什么)
delivery?: CronDelivery; // 投递配置(结果发到哪)
failureAlert?: CronFailureAlert | false; // 失败告警配置
state: { // 运行时状态(可变)
nextRunAtMs?: number; // 下次执行时间
runningAtMs?: number; // 当前正在执行的开始时间
lastRunAtMs?: number; // 最后执行时间
lastRunStatus?: CronRunStatus;
lastStatus?: CronRunStatus;
lastDurationMs?: number;
lastError?: string;
lastDelivered?: boolean;
lastDeliveryStatus?: CronDeliveryStatus;
lastDeliveryError?: string;
consecutiveErrors?: number; // 连续失败计数
lastFailureAlertAtMs?: number;
scheduleErrorCount?: number;
};
};2.2 三种调度类型
| 类型 | 含义 | 配置示例 |
|---|---|---|
at | 一次性定时 | { kind: "at", at: "2025-03-01T09:00:00Z" } |
every | 固定间隔循环 | { kind: "every", everyMs: 3600000 } (每小时) |
cron | Cron 表达式 | { kind: "cron", expr: "0 9 * * *", tz: "Asia/Shanghai" } |
typescript
// schedule.ts — 统一的下次执行时间计算
function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
if (schedule.kind === "at") {
// 解析 ISO 8601 字符串 → 毫秒时间戳
// 如果目标时间 > now → 返回
// 否则 → undefined(已过期)
}
if (schedule.kind === "every") {
// anchor + N × everyMs > nowMs 的最小值
// anchor 默认 = 任务创建时间
}
if (schedule.kind === "cron") {
// 使用 croner 库解析 cron 表达式
// 支持时区(Intl.DateTimeFormat 自动检测)
// 处理 croner 年份回退 bug 的 workaround
}
}定时任务系统/02-infographic-job-model-schedule-1775150812663.png)
2.3 两种负载类型
| 类型 | 执行模式 | 典型用途 |
|---|---|---|
systemEvent | 注入主会话 → 心跳触发 | 提醒、通知("该开会了") |
agentTurn | 独立 Agent 会话 | 复杂任务("分析 GitHub issues 并发到 Slack") |
typescript
type CronPayload =
| { kind: "systemEvent"; text: string }
| {
kind: "agentTurn";
message: string; // Agent 收到的指令
model?: string; // 可指定模型
thinking?: string; // 思考模式
timeoutSeconds?: number;
lightContext?: boolean; // 轻量上下文
allowUnsafeExternalContent?: boolean;
};2.4 两种执行模式
main(主会话模式):
任务到期 → enqueueSystemEvent("该开会了") → requestHeartbeatNow()
→ 主会话在下一个心跳中处理这条系统事件
→ AI 在当前活跃频道回复isolated(隔离会话模式):
任务到期 → runIsolatedAgentJob({ message: "分析 GitHub issues" })
→ 创建独立会话(cron:{jobId}:run:{uuid})
→ Agent 在隔离环境中执行任务
→ 结果通过 delivery 配置投递到指定频道定时任务系统/03-infographic-execution-modes-1775150813482.png)
三、调度引擎
3.1 Timer 循环
调度引擎基于 setTimeout 的单定时器设计:
armTimer()
│
▼
计算 nextWakeAtMs(所有 Job 中最近的 nextRunAtMs)
│
├── 无到期任务 → 不设置定时器
│
├── delay = max(nextWake - now, MIN_REFIRE_GAP_MS=2s)
├── clampedDelay = min(delay, MAX_TIMER_DELAY_MS=60s)
│ ↑ 每分钟至少醒一次
│
▼
setTimeout(onTimer, clampedDelay)
│
▼
onTimer()
├── running=true?
│ └── 是 → armRunningRecheckTimer(60s) → return
│ (Job 还在跑,60s 后再检查)
│
├── locked: ensureLoaded(forceReload) + findDueJobs()
│
├── dueJobs.length === 0?
│ └── 是 → recomputeNextRunsForMaintenance() → armTimer()
│
├── 标记所有 dueJobs 的 runningAtMs → persist()
│
├── 并发执行 dueJobs(concurrency 控制)
│ workers = Array.from({ length: concurrency }, async () => { ... })
│
├── locked: applyOutcomeToStoredJob() × N → persist()
│
└── finally:
├── sweepCronRunSessions() (会话清理器)
├── running = false
└── armTimer() (重新设置下一个定时器)定时任务系统/04-infographic-timer-loop-1775150817094.png)
3.2 关键设计:为什么最多每分钟醒一次?
typescript
const MAX_TIMER_DELAY_MS = 60_000;
const clampedDelay = Math.min(flooredDelay, MAX_TIMER_DELAY_MS);原因有三:
- 时钟漂移补偿:进程可能被暂停(如笔记本合盖),醒来后需要立即检查过期任务
- 壁钟跳变:NTP 校准可能让时间突然前进
- 卡住恢复:如果 Job 的
runningAtMs标记卡住,定期检查能及时发现
3.3 并发控制
typescript
function resolveRunConcurrency(state): number {
return Math.max(1, Math.floor(state.deps.cronConfig?.maxConcurrentRuns ?? 1));
}
// 工作池模式
const workers = Array.from({ length: concurrency }, async () => {
for (;;) {
const index = cursor++;
if (index >= dueJobs.length) return;
results[index] = await runDueJob(dueJobs[index]);
}
});
await Promise.all(workers);默认并发数为 1(串行执行),可通过 cron.maxConcurrentRuns 配置。
3.4 死锁保护
typescript
const STUCK_RUN_MS = 2 * 60 * 60 * 1000; // 2 小时
// normalizeJobTickState() 中
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
job.state.runningAtMs = undefined; // 清除卡住标记
}如果某个 Job 的 runningAtMs 超过 2 小时没有清除,说明执行器可能已经崩溃,自动清除标记以恢复调度。
3.5 防重复触发
typescript
const MIN_REFIRE_GAP_MS = 2_000; // 最小 2 秒间隔
// armTimer() 中
const flooredDelay = delay === 0 ? MIN_REFIRE_GAP_MS : delay;防止 computeJobNextRunAtMs 返回一个在同一秒内的时间,导致 setTimeout(0) 无限循环。
定时任务系统/05-infographic-scheduler-safety-1775150817943.png)
四、Job 执行详解
4.1 Main 会话模式执行
typescript
// service/timer.ts — executeJobCore()
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
// 注入系统事件到主会话
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
// 同步等待心跳执行(busy-wait with retry)
// 处理 "requests-in-flight" 状态
// 最多等待 2 分钟
// 超时后降级到 requestHeartbeatNow(异步)
} else {
// 异步唤醒:requestHeartbeatNow()
}
}心跳 Busy-Wait 机制:当 wakeMode=now 时,系统会同步等待心跳完成。如果 Agent 正在处理其他请求("requests-in-flight"),会每 250ms 重试一次,最多等 2 分钟。这确保了"现在提醒我"的实时性。
4.2 Isolated 会话模式执行
typescript
if (job.payload.kind === "agentTurn") {
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
abortSignal,
});
// 投递结果摘要
if (shouldEnqueueCronMainSummary({ ... })) {
state.deps.enqueueSystemEvent(`Cron: ${summaryText}`, {
agentId: job.agentId,
sessionKey: job.sessionKey,
});
}
}Isolated 模式创建一个独立的 Agent 会话,与主会话完全隔离。这允许 Cron Job 使用工具(浏览器、代码执行等)完成复杂任务。
4.3 Job 超时控制
typescript
async function executeJobCoreWithTimeout(state, job) {
const jobTimeoutMs = resolveCronJobTimeoutMs(job);
const runAbortController = new AbortController();
return await Promise.race([
executeJobCore(state, job, runAbortController.signal),
new Promise((_, reject) => {
setTimeout(() => {
runAbortController.abort("cron: job execution timed out");
reject(new Error("cron: job execution timed out"));
}, jobTimeoutMs);
}),
]);
}通过 AbortController + Promise.race 实现优雅超时:
- 超时后发送 abort 信号
- Agent 执行器收到信号后清理资源
- 比直接 kill 更安全
定时任务系统/06-infographic-job-execution-1775150818899.png)
五、错误处理与容错
5.1 指数退避
typescript
const DEFAULT_BACKOFF_SCHEDULE_MS = [
30_000, // 第 1 次失败 → 30 秒后重试
60_000, // 第 2 次失败 → 1 分钟后重试
5 * 60_000, // 第 3 次失败 → 5 分钟后重试
15 * 60_000, // 第 4 次失败 → 15 分钟后重试
60 * 60_000, // 第 5+ 次失败 → 60 分钟后重试
];循环任务(every/cron)失败后,nextRunAtMs = max(正常下次时间, 当前时间 + 退避延迟)。
5.2 瞬态错误自动重试(一次性任务)
typescript
const TRANSIENT_PATTERNS = {
rate_limit: /(rate[_ ]limit|too many requests|429|resource has been exhausted|cloudflare)/i,
network: /(network|econnreset|econnrefused|fetch failed|socket)/i,
timeout: /(timeout|etimedout)/i,
server_error: /\b5\d{2}\b/,
};一次性任务(at)失败时,系统检查是否属于瞬态错误:
- 是 → 安排重试(最多 3 次,指数退避)
- 否 → 标记为永久失败,禁用任务
5.3 连续失败告警
typescript
// applyJobResult() 中
if (result.status === "error") {
job.state.consecutiveErrors += 1;
const alertConfig = resolveFailureAlert(state, job);
if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) {
// 检查冷却期(默认 1 小时)
if (!inCooldown) {
emitFailureAlert(state, {
job,
error: result.error,
consecutiveErrors,
channel: alertConfig.channel,
});
}
}
}连续失败达到阈值(默认 2 次)后,系统向配置的频道发送告警消息。告警有 1 小时冷却期,避免频繁骚扰。
5.4 Schedule 错误自动禁用
typescript
const MAX_SCHEDULE_ERRORS = 3;
function recordScheduleComputeError({ state, job, err }) {
job.state.scheduleErrorCount += 1;
if (errorCount >= MAX_SCHEDULE_ERRORS) {
job.enabled = false; // 自动禁用
// 通知用户
state.deps.enqueueSystemEvent(
`⚠️ Cron job "${job.name}" has been auto-disabled after ${errorCount} consecutive schedule errors`,
{ agentId: job.agentId }
);
}
}如果 cron 表达式解析连续失败 3 次(如无效的时区),任务自动禁用并通知用户。
定时任务系统/07-infographic-error-handling-1775150819824.png)
六、投递系统
6.1 投递模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
announce | 发送到聊天频道 | Telegram、Discord、Slack 等 |
webhook | HTTP POST 到外部 URL | 集成外部系统 |
none | 不投递 | 仅记录结果 |
6.2 投递计划解析
typescript
// delivery.ts
function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
// 优先使用 delivery 配置
if (hasDelivery) {
return {
mode: delivery.mode ?? "announce",
channel: delivery.channel ?? "last", // "last" = 最后活跃频道
to: delivery.to, // 目标群组/用户
accountId: delivery.accountId,
};
}
// 兼容旧版 payload 中的 deliver/channel/to 字段
return {
mode: legacyDeliver ? "announce" : "none",
channel: legacyChannel ?? "last",
to: legacyTo,
};
}6.3 失败目标路由(Failure Destination)
当任务执行失败时,可以将错误通知发送到与成功结果不同的目标:
typescript
type CronDelivery = {
mode: CronDeliveryMode;
channel?: CronMessageChannel;
to?: string;
accountId?: string;
bestEffort?: boolean;
failureDestination?: { // 失败时的备选目标
channel?: CronMessageChannel;
to?: string;
accountId?: string;
mode?: "announce" | "webhook";
};
};定时任务系统/08-infographic-delivery-system-1775150820717.png)
七、错峰执行(Stagger)
7.1 问题场景
用户设置了多个 0 * * * *(每小时整点)的 Cron Job,如果全部在整点同时触发:
- AI 并发处理能力有限
- API 调用可能触发限流
- 结果投递可能拥塞
7.2 解决方案:哈希偏移
typescript
// stagger.ts
const DEFAULT_TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1000; // 5 分钟
function isRecurringTopOfHourCronExpr(expr: string) {
// "0 * * * *" → true
// "0 0 9 * * *" → false(不是每小时)
}
// jobs.ts
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
// SHA-256(jobId) → 取前 4 字节 → 模 staggerMs
// 每个 Job 得到固定且唯一的偏移量
const digest = crypto.createHash("sha256").update(jobId).digest();
return digest.readUInt32BE(0) % staggerMs;
}对于每小时整点的 Cron Job,系统自动在 0-5 分钟内分散执行时间。偏移量基于 Job ID 的 SHA-256 哈希,确保:
- 同一个 Job 每次偏移量相同(稳定性)
- 不同 Job 偏移量均匀分布(公平性)
定时任务系统/09-infographic-stagger-hash-1775150821603.png)
八、持久化存储
8.1 存储格式
json
// ~/.openclaw/cron/jobs.json
{
"version": 1,
"jobs": [
{
"id": "a1b2c3d4-...",
"name": "Daily email check",
"enabled": true,
"schedule": { "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" },
"sessionTarget": "main",
"wakeMode": "now",
"payload": { "kind": "systemEvent", "text": "该检查邮件了" },
"state": { "nextRunAtMs": 1711936800000 }
}
]
}8.2 原子写入
typescript
// store.ts
async function saveCronStore(storePath: string, store: CronStoreFile) {
const json = JSON.stringify(store, null, 2);
// 1. 检查是否有变更(内存缓存比对)
if (cached === json) return;
// 2. 写入临时文件
const tmp = `${storePath}.${process.pid}.${randomBytes(8).hex()}.tmp`;
await writeFile(tmp, json);
// 3. 备份现有文件
await copyFile(storePath, `${storePath}.bak`);
// 4. 原子重命名
await renameWithRetry(tmp, storePath);
// 5. 更新缓存
serializedStoreCache.set(storePath, json);
}renameWithRetry 处理了 Windows 平台的 EBUSY/EPERM 问题(Windows 不支持原子替换已存在的文件)。
8.3 序列化锁
typescript
// service/locked.ts
export async function locked<T>(state, fn: () => Promise<T>): Promise<T> {
// 序列化所有对 store 的操作
// 避免并发读写导致数据丢失
state.op = state.op.then(fn, fn);
return state.op;
}使用 Promise 链实现串行化锁,所有对 Store 的读写操作都通过 locked() 包装,确保不会有两个操作同时修改同一个文件。
定时任务系统/10-infographic-persistence-1775150822476.png)
九、会话清理器(Session Reaper)
Isolated 模式的 Cron Job 每次执行都会创建一个新会话(cron:{jobId}:run:{uuid}),长期运行后会积累大量过期会话。
9.1 清理策略
onTimer() → finally:
└── sweepCronRunSessions()
│
├── 节流检查:上次清理距今 < 5 分钟? → 跳过
│
├── 读取会话存储
│
├── 遍历所有会话 key
│ ├── 匹配 cron run 格式? (cron:{id}:run:{uuid})
│ └── updatedAt < cutoff (now - retentionMs)?
│ └── 删除会话记录
│
├── 归档已删除会话的转录文件
│
└── 清理过期的归档文件默认保留期为 24 小时,可通过 cron.sessionRetention 配置(支持人类可读格式如 "48h"、"7d")。设为 false 可禁用清理。
定时任务系统/11-infographic-session-reaper-1775150823466.png)
十、输入归一化
10.1 兼容性处理
系统需要兼容多个历史版本的 API 格式:
| 历史格式 | 当前格式 | 转换 |
|---|---|---|
schedule.atMs: 1711936800000 | schedule.at: "2025-04-01T01:00:00.000Z" | 数字→ISO 字符串 |
schedule.cron: "0 9 * * *" | schedule.expr: "0 9 * * *" | 字段重命名 |
payload.deliver: true | delivery: { mode: "announce" } | 从 payload 提取到独立 delivery |
payload.provider: "telegram" | delivery.channel: "telegram" | provider → channel |
10.2 Flat-Params 恢复
某些 AI 模型(如 Grok)不擅长构建嵌套 JSON,会把 Job 属性扁平化到顶层:
json
// AI 输出的扁平格式
{
"action": "add",
"name": "Daily check",
"kind": "cron",
"expr": "0 9 * * *",
"message": "Check emails"
}
// 归一化后
{
"action": "add",
"name": "Daily check",
"schedule": { "kind": "cron", "expr": "0 9 * * *" },
"payload": { "kind": "agentTurn", "message": "Check emails" },
"sessionTarget": "isolated",
"delivery": { "mode": "announce" }
}normalize.ts 中有大量的兼容性代码,确保无论 AI 输出什么格式,都能正确解析。
定时任务系统/12-infographic-normalization-1775150824357.png)
十一、Agent 工具层
11.1 工具入口
createCronTool() 创建一个名为 "cron" 的 Agent 工具,标记为 ownerOnly: true(只有 owner 可以创建定时任务)。
支持 7 种 action:
| Action | 功能 | Gateway RPC |
|---|---|---|
status | 查看调度器状态 | cron.status |
list | 列出所有任务 | cron.list |
add | 创建新任务 | cron.add |
update | 修改任务 | cron.update |
remove | 删除任务 | cron.remove |
run | 立即执行任务 | cron.run |
runs | 查看执行历史 | cron.runs |
wake | 唤醒助手 | wake |
11.2 上下文消息注入
typescript
// cron-tool.ts
if (input.contextMessages) {
const MAX_CONTEXT_MESSAGES = 20;
// 从最近的聊天记录中截取
// 附加到提醒文本中
// 让 AI 在回复提醒时能参考上下文
}当 AI 设置提醒时,可以附带当前对话的上下文。这样到期触发时,AI 能回忆起"提醒是在什么场景下设置的"。
11.3 Delivery 推断
typescript
// cron-tool.ts — 自动推断投递配置
function inferDelivery(input, sessionContext) {
// 1. 如果用户明确指定了 delivery → 使用指定的
// 2. 如果在 Telegram forum topic 中设置 → 自动路由到该 topic
// 3. 如果在 threaded session 中设置 → 自动路由到该线程
// 4. 默认 → { mode: "announce", channel: "last" }
}十二、主会话心跳投递策略
12.1 何时向主会话发送摘要?
Isolated 模式的 Cron Job 执行完毕后,系统需要决定是否向主会话发送一条摘要消息。
typescript
// heartbeat-policy.ts
function shouldEnqueueCronMainSummary(params) {
// 不发送的情况:
// 1. 没有摘要文本
// 2. 投递已请求且成功 → 用户已在目标频道看到了
// 3. 投递已尝试(即使状态不确定)→ 避免重复
// 4. 摘要内容是心跳 OK token → 内部标记,不该泄露
// 5. suppressMainSummary=true → 投递目标错误导致的失败
// 发送的情况:
// 1. 未请求投递 → 结果只在主会话可见
// 2. 投递失败且 deliveryAttempted=false → 用户没看到
}十三、Gateway 重启恢复
13.1 错过的任务
如果 Gateway 重启,某些 Job 可能在停机期间到期。runMissedJobs() 在启动时检查并执行这些任务:
typescript
export async function runMissedJobs(state, opts?) {
// 1. 加载 store
// 2. 找出所有到期但未执行的 Job(collectRunnableJobs)
// 特殊处理:一次性 Job 已执行过的不再补跑
// 3. 标记 runningAtMs → persist
// 4. 逐个执行
// 5. 应用结果 → persist
}13.2 nextRunAtMs 保守策略
在重启恢复时,系统不会重新计算已有的 nextRunAtMs(除非为 undefined)。这防止了:
- 每日任务跳过一天(因为重新计算指向明天)
- 间隔任务跳过中间的周期
typescript
export function recomputeNextRuns(state): boolean {
return walkSchedulableJobs(state, ({ job, nowMs: now }) => {
const nextRun = job.state.nextRunAtMs;
// 只有缺失或已过期才重算
const isDueOrMissing = !isFiniteTimestamp(nextRun) || now >= nextRun;
if (isDueOrMissing) {
recomputeJobNextRunAtMs({ state, job, nowMs: now });
}
});
}十四、配置参考
json5
{
"cron": {
"enabled": true,
"storePath": "~/.openclaw/cron/jobs.json",
"maxConcurrentRuns": 1,
"sessionRetention": "24h", // 或 false 禁用清理
"retry": {
"maxAttempts": 3, // 一次性任务最大重试次数
"backoffMs": [30000, 60000, 300000],
"retryOn": ["rate_limit", "network", "timeout", "server_error"]
},
"failureAlert": {
"enabled": false,
"after": 2, // 连续失败 N 次后告警
"cooldownMs": 3600000, // 告警冷却期(1 小时)
"channel": "last",
"mode": "announce" // 或 "webhook"
}
}
}十五、与其他模块的交互
┌─── Agent 系统 ───┐
│ │
│ Pi Agent Core │──→ cron-tool.ts ──→ 7 种 action
│ │
│ 工具注册 │──→ createCronTool() ──→ ToolDefinition
└───────────────────┘
│
▼
┌─── Gateway ───────┐
│ │
│ RPC 处理 │──→ cron.add/list/update/remove/run/status
│ │
│ 心跳系统 │──→ requestHeartbeatNow() / runHeartbeatOnce()
│ │
│ 系统事件 │──→ enqueueSystemEvent() → 注入会话
└────────────────────┘
│
▼
┌─── 会话系统 ──────┐
│ │
│ 会话存储 │──→ 创建/管理 cron run 会话
│ (sessions.json) │
│ │──→ sweepCronRunSessions() 清理
│ 会话转录 │──→ 归档/删除过期的 JSONL
└────────────────────┘
│
▼
┌─── Channel 系统 ──┐
│ │
│ 消息投递 │──→ announce: 发送到聊天频道
│ │──→ webhook: HTTP POST
│ 失败通知 │──→ sendCronFailureAlert()
└────────────────────┘定时任务系统/13-infographic-module-interactions-1775150825291.png)
十六、数据流全景图
AI 说:"每天早上 9 点提醒我查看邮件"
│
▼
cron-tool.ts: action="add"
│
├── normalizeCronJobCreate() → 归一化输入
│ schedule: { kind: "cron", expr: "0 9 * * *", tz: "Asia/Shanghai" }
│ payload: { kind: "systemEvent", text: "该检查邮件了" }
│ sessionTarget: "main"
│ wakeMode: "now"
│
▼
Gateway RPC: cron.add → ops.ts: add()
│
├── locked:
│ ├── ensureLoaded() → 加载 jobs.json
│ ├── createJob() → UUID + computeJobNextRunAtMs()
│ ├── push to store.jobs
│ └── persist() → 原子写入 jobs.json
│
├── emit: { action: "added", jobId: "..." }
│
└── armTimer() → setTimeout(onTimer, delay)
──── 次日早上 9 点 ────
setTimeout 触发 → onTimer()
│
├── findDueJobs() → 找到 "每天 9 点" 这个 Job
│
├── executeJobCoreWithTimeout()
│ │
│ └── executeJobCore()
│ │
│ ├── sessionTarget = "main"
│ ├── enqueueSystemEvent("该检查邮件了")
│ └── runHeartbeatOnce() → 同步等待心跳
│ │
│ └── AI 在最后活跃频道回复 "好的,提醒你该检查邮件了!"
│
├── applyJobResult()
│ ├── status = "ok"
│ ├── consecutiveErrors = 0
│ └── nextRunAtMs = 明天 9 点
│
├── persist() → 更新 jobs.json
│
└── armTimer() → setTimeout 到明天 9 点十七、关键设计决策与权衡
17.1 为什么用 JSON 文件而非数据库?
优势:
- 人类可读可编辑(JSON5)
- 零依赖(不需要 SQLite)
- 易于备份和迁移(单文件复制)
- 个人助手场景下任务数量少(通常 < 100 个)
权衡:
- 每次修改需要全量读写(通过序列化缓存优化)
- 并发安全依赖串行化锁(Promise 链)
17.2 为什么分 main/isolated 两种模式?
| main | isolated | |
|---|---|---|
| 适用场景 | 简单提醒 | 复杂任务 |
| 执行方式 | 注入主会话 | 独立会话 |
| 上下文 | 共享主会话上下文 | 干净的上下文 |
| 工具访问 | 完全权限 | 受限 |
| 投递 | 心跳驱动 | 主动投递 |
| 开销 | 极低 | 创建新会话 |
main 模式适合"提醒我做 X",isolated 模式适合"帮我做 X 然后告诉我结果"。
17.3 错峰执行的意义
当用户设置多个整点 Job 时(如"每小时检查一次 API"、"每小时汇总一次日志"),全部在 :00 触发会:
- 并发请求 → API 限流
- Agent 排队 → 延迟增大
- 投递拥塞 → 消息乱序
5 分钟的错峰窗口(基于 JobID 哈希的确定性偏移)在保持"大约每小时一次"的同时,显著降低了竞争。
十八、总结
OpenClaw 的定时任务系统是一个精密且容错的调度引擎,有以下亮点:
- 三种调度类型:一次性(at)、固定间隔(every)、Cron 表达式(cron),覆盖所有定时场景
- 双执行模式:main(轻量提醒)和 isolated(复杂任务),按需选择
- 完善的容错:指数退避、瞬态错误自动重试、卡住检测、连续失败告警、Schedule 错误自动禁用
- 多种投递方式:announce(聊天频道)、webhook(HTTP POST)、失败目标路由
- 错峰执行:基于 JobID 哈希的确定性偏移,防止整点拥塞
- 重启恢复:错过的任务自动补跑,nextRunAtMs 保守策略
- 会话清理:自动清理过期的 cron run 会话和转录文件
- 极强的兼容性:Flat-params 恢复、历史格式迁移、多种 AI 模型输出适配
- 安全设计:ownerOnly 权限、AbortController 超时、序列化锁
整个设计遵循"可靠且低运维"的理念:用户只需要告诉 AI "每天 9 点提醒我",其余的调度、执行、投递、容错、清理全部自动化。
定时任务系统/14-infographic-design-highlights-1775150826102.png)