Skip to content

OpenClaw 源码解读(十)定时任务系统

本文基于 OpenClaw 2026.3.2 源码,深入解读定时任务系统(Cron System)的架构设计与实现细节。


一、模块概览

定时任务系统赋予 AI 助手时间感知能力:设置提醒、定期检查、定时报告……它让 AI 可以在未来的某个时间点主动执行任务,而不需要用户在线。

用户可以这样使用:

  • "每天早上 9 点提醒我查看邮件"
  • "30 分钟后提醒我开会"
  • "每周一分析一下我的 GitHub issues 并发到 Slack"

1.1 核心源码分布

目录/文件行数(约)职责
src/cron/service/state.ts~156服务状态定义(依赖注入接口、状态类型)
src/cron/service/timer.ts~1143调度引擎核心(定时器、Job 执行、结果处理、失败告警)
src/cron/service/jobs.ts~835Job 生命周期(创建、修改、调度计算、卡住检测)
src/cron/service/ops.ts~330+CRUD 操作(add、update、remove、list、run、status)
src/cron/service/store.ts~60+运行时 Store 加载/持久化桥接
src/cron/service/locked.ts~30+序列化锁(防止并发操作竞态)
src/cron/schedule.ts~117调度计算(croner 引擎、三种调度类型)
src/cron/normalize.ts~504输入归一化(兼容历史格式、默认值推导)
src/cron/store.ts~113JSON 文件持久化(原子写、备份、去重)
src/cron/types.ts~200+类型定义(Job、Schedule、Payload、Delivery)
src/cron/delivery.ts~301投递计划(频道路由、Webhook、失败通知)
src/cron/stagger.ts~47错峰执行(哈希偏移量,防止多 Job 同时触发)
src/cron/session-reaper.ts~161会话清理器(过期 cron run 会话自动清理)
src/cron/parse.ts~60+时间解析(ISO 8601、自然语言时间)
src/cron/webhook-url.ts~20+Webhook URL 校验与归一化
src/cron/heartbeat-policy.ts~50+心跳投递策略(何时向主会话发送摘要)
src/agents/tools/cron-tool.ts~526Agent 工具入口(7 种 action)

1.2 系统全景架构

┌──────────────────────────────────────────────────────────────────────┐
│                        定时任务系统全景架构                            │
│                                                                       │
│  ┌─── AI Agent ────────────────────────────────────────────────────┐ │
│  │                                                                  │ │
│  │  cron-tool.ts                                                    │ │
│  │  ├── add     "每天9点提醒我查看邮件"                              │ │
│  │  ├── list    列出所有定时任务                                     │ │
│  │  ├── update  修改任务                                            │ │
│  │  ├── remove  删除任务                                            │ │
│  │  ├── run     立即执行                                            │ │
│  │  ├── runs    查看执行历史                                        │ │
│  │  └── wake    唤醒助手                                            │ │
│  └──────────────┬───────────────────────────────────────────────────┘ │
│                 │  Gateway RPC                                        │
│                 ▼                                                     │
│  ┌─── Cron Service ────────────────────────────────────────────────┐ │
│  │                                                                  │ │
│  │  ┌── 状态 ──────────────────────────────────────────────────┐   │ │
│  │  │  store: CronStoreFile │ timer: setTimeout               │   │ │
│  │  │  running: boolean     │ op: Promise<>                   │   │ │
│  │  └──────────────────────────────────────────────────────────┘   │ │
│  │                                                                  │ │
│  │  ┌── 调度引擎 ──────────────────────────────────────────────┐   │ │
│  │  │  armTimer() → setTimeout(onTimer, delay)                  │   │ │
│  │  │       │                                                    │   │ │
│  │  │       ▼                                                    │   │ │
│  │  │  onTimer() → findDueJobs() → runDueJob() × concurrency   │   │ │
│  │  │       │                                                    │   │ │
│  │  │       ├── sessionTarget="main"                             │   │ │
│  │  │       │   enqueueSystemEvent() → requestHeartbeatNow()    │   │ │
│  │  │       │                                                    │   │ │
│  │  │       └── sessionTarget="isolated"                         │   │ │
│  │  │           runIsolatedAgentJob() → 独立 Agent 会话          │   │ │
│  │  │               │                                            │   │ │
│  │  │               └── Delivery                                 │   │ │
│  │  │                   ├── announce → 发消息到聊天频道            │   │ │
│  │  │                   ├── webhook → HTTP POST 到外部 URL        │   │ │
│  │  │                   └── none → 仅记录结果                    │   │ │
│  │  └──────────────────────────────────────────────────────────┘   │ │
│  └──────────────────────────────────────────────────────────────────┘ │
│                 │                                                     │
│                 ▼                                                     │
│  ┌─── 持久化 ──────────────────────────────────────────────────────┐ │
│  │  ~/.openclaw/cron/jobs.json                                      │ │
│  │  (JSON5 格式,原子写入 + .bak 备份)                               │ │
│  └──────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────┘

![定时任务系统全景架构](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/01-infographic-cron-system-overview-1775150811675.png)


二、Job 数据模型

2.1 CronJob 核心结构

typescript
type CronJob = {
    id: string;                    // UUID
    name: string;                  // 任务名称(如 "Daily email check")
    description?: string;          // 可选描述
    agentId?: string;              // 关联的 Agent(多 Agent 场景)
    sessionKey?: string;           // 会话命名空间(用于频道/线程路由)
    enabled: boolean;              // 是否启用
    deleteAfterRun?: boolean;      // 一次性任务执行后自动删除
    createdAtMs: number;           // 创建时间戳
    updatedAtMs: number;           // 最后更新时间戳

    schedule: CronSchedule;        // 调度配置(三种类型)
    sessionTarget: "main" | "isolated";  // 执行模式
    wakeMode?: "now" | "next-heartbeat"; // 唤醒方式
    payload: CronPayload;          // 负载(要执行什么)
    delivery?: CronDelivery;       // 投递配置(结果发到哪)
    failureAlert?: CronFailureAlert | false; // 失败告警配置

    state: {                       // 运行时状态(可变)
        nextRunAtMs?: number;      // 下次执行时间
        runningAtMs?: number;      // 当前正在执行的开始时间
        lastRunAtMs?: number;      // 最后执行时间
        lastRunStatus?: CronRunStatus;
        lastStatus?: CronRunStatus;
        lastDurationMs?: number;
        lastError?: string;
        lastDelivered?: boolean;
        lastDeliveryStatus?: CronDeliveryStatus;
        lastDeliveryError?: string;
        consecutiveErrors?: number; // 连续失败计数
        lastFailureAlertAtMs?: number;
        scheduleErrorCount?: number;
    };
};

2.2 三种调度类型

类型含义配置示例
at一次性定时{ kind: "at", at: "2025-03-01T09:00:00Z" }
every固定间隔循环{ kind: "every", everyMs: 3600000 } (每小时)
cronCron 表达式{ kind: "cron", expr: "0 9 * * *", tz: "Asia/Shanghai" }
typescript
// schedule.ts — 统一的下次执行时间计算
function computeNextRunAtMs(schedule: CronSchedule, nowMs: number): number | undefined {
    if (schedule.kind === "at") {
        // 解析 ISO 8601 字符串 → 毫秒时间戳
        // 如果目标时间 > now → 返回
        // 否则 → undefined(已过期)
    }
    
    if (schedule.kind === "every") {
        // anchor + N × everyMs > nowMs 的最小值
        // anchor 默认 = 任务创建时间
    }
    
    if (schedule.kind === "cron") {
        // 使用 croner 库解析 cron 表达式
        // 支持时区(Intl.DateTimeFormat 自动检测)
        // 处理 croner 年份回退 bug 的 workaround
    }
}

![CronJob 数据模型与三种调度类型](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/02-infographic-job-model-schedule-1775150812663.png)

2.3 两种负载类型

类型执行模式典型用途
systemEvent注入主会话 → 心跳触发提醒、通知("该开会了")
agentTurn独立 Agent 会话复杂任务("分析 GitHub issues 并发到 Slack")
typescript
type CronPayload =
    | { kind: "systemEvent"; text: string }
    | {
          kind: "agentTurn";
          message: string;       // Agent 收到的指令
          model?: string;        // 可指定模型
          thinking?: string;     // 思考模式
          timeoutSeconds?: number;
          lightContext?: boolean; // 轻量上下文
          allowUnsafeExternalContent?: boolean;
      };

2.4 两种执行模式

main(主会话模式)

任务到期 → enqueueSystemEvent("该开会了") → requestHeartbeatNow()
    → 主会话在下一个心跳中处理这条系统事件
    → AI 在当前活跃频道回复

isolated(隔离会话模式)

任务到期 → runIsolatedAgentJob({ message: "分析 GitHub issues" })
    → 创建独立会话(cron:{jobId}:run:{uuid})
    → Agent 在隔离环境中执行任务
    → 结果通过 delivery 配置投递到指定频道

![main 与 isolated 两种执行模式对比](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/03-infographic-execution-modes-1775150813482.png)


三、调度引擎

3.1 Timer 循环

调度引擎基于 setTimeout单定时器设计:

armTimer()


计算 nextWakeAtMs(所有 Job 中最近的 nextRunAtMs)

    ├── 无到期任务 → 不设置定时器

    ├── delay = max(nextWake - now, MIN_REFIRE_GAP_MS=2s)
    ├── clampedDelay = min(delay, MAX_TIMER_DELAY_MS=60s)
    │                                   ↑ 每分钟至少醒一次


setTimeout(onTimer, clampedDelay)


onTimer()
    ├── running=true?
    │   └── 是 → armRunningRecheckTimer(60s) → return
    │       (Job 还在跑,60s 后再检查)

    ├── locked: ensureLoaded(forceReload) + findDueJobs()

    ├── dueJobs.length === 0?
    │   └── 是 → recomputeNextRunsForMaintenance() → armTimer()

    ├── 标记所有 dueJobs 的 runningAtMs → persist()

    ├── 并发执行 dueJobs(concurrency 控制)
    │   workers = Array.from({ length: concurrency }, async () => { ... })

    ├── locked: applyOutcomeToStoredJob() × N → persist()

    └── finally:
        ├── sweepCronRunSessions()  (会话清理器)
        ├── running = false
        └── armTimer()  (重新设置下一个定时器)

![Timer 调度循环流程](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/04-infographic-timer-loop-1775150817094.png)

3.2 关键设计:为什么最多每分钟醒一次?

typescript
const MAX_TIMER_DELAY_MS = 60_000;
const clampedDelay = Math.min(flooredDelay, MAX_TIMER_DELAY_MS);

原因有三:

  1. 时钟漂移补偿:进程可能被暂停(如笔记本合盖),醒来后需要立即检查过期任务
  2. 壁钟跳变:NTP 校准可能让时间突然前进
  3. 卡住恢复:如果 Job 的 runningAtMs 标记卡住,定期检查能及时发现

3.3 并发控制

typescript
function resolveRunConcurrency(state): number {
    return Math.max(1, Math.floor(state.deps.cronConfig?.maxConcurrentRuns ?? 1));
}

// 工作池模式
const workers = Array.from({ length: concurrency }, async () => {
    for (;;) {
        const index = cursor++;
        if (index >= dueJobs.length) return;
        results[index] = await runDueJob(dueJobs[index]);
    }
});
await Promise.all(workers);

默认并发数为 1(串行执行),可通过 cron.maxConcurrentRuns 配置。

3.4 死锁保护

typescript
const STUCK_RUN_MS = 2 * 60 * 60 * 1000; // 2 小时

// normalizeJobTickState() 中
if (typeof runningAt === "number" && nowMs - runningAt > STUCK_RUN_MS) {
    job.state.runningAtMs = undefined; // 清除卡住标记
}

如果某个 Job 的 runningAtMs 超过 2 小时没有清除,说明执行器可能已经崩溃,自动清除标记以恢复调度。

3.5 防重复触发

typescript
const MIN_REFIRE_GAP_MS = 2_000; // 最小 2 秒间隔

// armTimer() 中
const flooredDelay = delay === 0 ? MIN_REFIRE_GAP_MS : delay;

防止 computeJobNextRunAtMs 返回一个在同一秒内的时间,导致 setTimeout(0) 无限循环。

![调度引擎安全机制](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/05-infographic-scheduler-safety-1775150817943.png)


四、Job 执行详解

4.1 Main 会话模式执行

typescript
// service/timer.ts — executeJobCore()
if (job.sessionTarget === "main") {
    const text = resolveJobPayloadTextForMain(job);
    
    // 注入系统事件到主会话
    state.deps.enqueueSystemEvent(text, {
        agentId: job.agentId,
        sessionKey: job.sessionKey,
        contextKey: `cron:${job.id}`,
    });
    
    if (job.wakeMode === "now" && state.deps.runHeartbeatOnce) {
        // 同步等待心跳执行(busy-wait with retry)
        // 处理 "requests-in-flight" 状态
        // 最多等待 2 分钟
        // 超时后降级到 requestHeartbeatNow(异步)
    } else {
        // 异步唤醒:requestHeartbeatNow()
    }
}

心跳 Busy-Wait 机制:当 wakeMode=now 时,系统会同步等待心跳完成。如果 Agent 正在处理其他请求("requests-in-flight"),会每 250ms 重试一次,最多等 2 分钟。这确保了"现在提醒我"的实时性。

4.2 Isolated 会话模式执行

typescript
if (job.payload.kind === "agentTurn") {
    const res = await state.deps.runIsolatedAgentJob({
        job,
        message: job.payload.message,
        abortSignal,
    });
    
    // 投递结果摘要
    if (shouldEnqueueCronMainSummary({ ... })) {
        state.deps.enqueueSystemEvent(`Cron: ${summaryText}`, {
            agentId: job.agentId,
            sessionKey: job.sessionKey,
        });
    }
}

Isolated 模式创建一个独立的 Agent 会话,与主会话完全隔离。这允许 Cron Job 使用工具(浏览器、代码执行等)完成复杂任务。

4.3 Job 超时控制

typescript
async function executeJobCoreWithTimeout(state, job) {
    const jobTimeoutMs = resolveCronJobTimeoutMs(job);
    
    const runAbortController = new AbortController();
    return await Promise.race([
        executeJobCore(state, job, runAbortController.signal),
        new Promise((_, reject) => {
            setTimeout(() => {
                runAbortController.abort("cron: job execution timed out");
                reject(new Error("cron: job execution timed out"));
            }, jobTimeoutMs);
        }),
    ]);
}

通过 AbortController + Promise.race 实现优雅超时:

  • 超时后发送 abort 信号
  • Agent 执行器收到信号后清理资源
  • 比直接 kill 更安全

![Job 执行流程与超时控制](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/06-infographic-job-execution-1775150818899.png)


五、错误处理与容错

5.1 指数退避

typescript
const DEFAULT_BACKOFF_SCHEDULE_MS = [
    30_000,      // 第 1 次失败 →  30 秒后重试
    60_000,      // 第 2 次失败 →   1 分钟后重试
    5 * 60_000,  // 第 3 次失败 →   5 分钟后重试
    15 * 60_000, // 第 4 次失败 →  15 分钟后重试
    60 * 60_000, // 第 5+ 次失败 → 60 分钟后重试
];

循环任务(every/cron)失败后,nextRunAtMs = max(正常下次时间, 当前时间 + 退避延迟)。

5.2 瞬态错误自动重试(一次性任务)

typescript
const TRANSIENT_PATTERNS = {
    rate_limit: /(rate[_ ]limit|too many requests|429|resource has been exhausted|cloudflare)/i,
    network: /(network|econnreset|econnrefused|fetch failed|socket)/i,
    timeout: /(timeout|etimedout)/i,
    server_error: /\b5\d{2}\b/,
};

一次性任务(at)失败时,系统检查是否属于瞬态错误:

  • 是 → 安排重试(最多 3 次,指数退避)
  • 否 → 标记为永久失败,禁用任务

5.3 连续失败告警

typescript
// applyJobResult() 中
if (result.status === "error") {
    job.state.consecutiveErrors += 1;
    
    const alertConfig = resolveFailureAlert(state, job);
    if (alertConfig && job.state.consecutiveErrors >= alertConfig.after) {
        // 检查冷却期(默认 1 小时)
        if (!inCooldown) {
            emitFailureAlert(state, {
                job,
                error: result.error,
                consecutiveErrors,
                channel: alertConfig.channel,
            });
        }
    }
}

连续失败达到阈值(默认 2 次)后,系统向配置的频道发送告警消息。告警有 1 小时冷却期,避免频繁骚扰。

5.4 Schedule 错误自动禁用

typescript
const MAX_SCHEDULE_ERRORS = 3;

function recordScheduleComputeError({ state, job, err }) {
    job.state.scheduleErrorCount += 1;
    
    if (errorCount >= MAX_SCHEDULE_ERRORS) {
        job.enabled = false;  // 自动禁用
        
        // 通知用户
        state.deps.enqueueSystemEvent(
            `⚠️ Cron job "${job.name}" has been auto-disabled after ${errorCount} consecutive schedule errors`,
            { agentId: job.agentId }
        );
    }
}

如果 cron 表达式解析连续失败 3 次(如无效的时区),任务自动禁用并通知用户。

![容错策略体系](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/07-infographic-error-handling-1775150819824.png)


六、投递系统

6.1 投递模式

模式说明适用场景
announce发送到聊天频道Telegram、Discord、Slack 等
webhookHTTP POST 到外部 URL集成外部系统
none不投递仅记录结果

6.2 投递计划解析

typescript
// delivery.ts
function resolveCronDeliveryPlan(job: CronJob): CronDeliveryPlan {
    // 优先使用 delivery 配置
    if (hasDelivery) {
        return {
            mode: delivery.mode ?? "announce",
            channel: delivery.channel ?? "last",  // "last" = 最后活跃频道
            to: delivery.to,                       // 目标群组/用户
            accountId: delivery.accountId,
        };
    }
    
    // 兼容旧版 payload 中的 deliver/channel/to 字段
    return {
        mode: legacyDeliver ? "announce" : "none",
        channel: legacyChannel ?? "last",
        to: legacyTo,
    };
}

6.3 失败目标路由(Failure Destination)

当任务执行失败时,可以将错误通知发送到与成功结果不同的目标:

typescript
type CronDelivery = {
    mode: CronDeliveryMode;
    channel?: CronMessageChannel;
    to?: string;
    accountId?: string;
    bestEffort?: boolean;
    failureDestination?: {        // 失败时的备选目标
        channel?: CronMessageChannel;
        to?: string;
        accountId?: string;
        mode?: "announce" | "webhook";
    };
};

![投递系统三种模式与路由](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/08-infographic-delivery-system-1775150820717.png)


七、错峰执行(Stagger)

7.1 问题场景

用户设置了多个 0 * * * *(每小时整点)的 Cron Job,如果全部在整点同时触发:

  • AI 并发处理能力有限
  • API 调用可能触发限流
  • 结果投递可能拥塞

7.2 解决方案:哈希偏移

typescript
// stagger.ts
const DEFAULT_TOP_OF_HOUR_STAGGER_MS = 5 * 60 * 1000; // 5 分钟

function isRecurringTopOfHourCronExpr(expr: string) {
    // "0 * * * *" → true
    // "0 0 9 * * *" → false(不是每小时)
}

// jobs.ts
function resolveStableCronOffsetMs(jobId: string, staggerMs: number) {
    // SHA-256(jobId) → 取前 4 字节 → 模 staggerMs
    // 每个 Job 得到固定且唯一的偏移量
    const digest = crypto.createHash("sha256").update(jobId).digest();
    return digest.readUInt32BE(0) % staggerMs;
}

对于每小时整点的 Cron Job,系统自动在 0-5 分钟内分散执行时间。偏移量基于 Job ID 的 SHA-256 哈希,确保:

  • 同一个 Job 每次偏移量相同(稳定性)
  • 不同 Job 偏移量均匀分布(公平性)

![错峰执行哈希偏移](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/09-infographic-stagger-hash-1775150821603.png)


八、持久化存储

8.1 存储格式

json
// ~/.openclaw/cron/jobs.json
{
    "version": 1,
    "jobs": [
        {
            "id": "a1b2c3d4-...",
            "name": "Daily email check",
            "enabled": true,
            "schedule": { "kind": "cron", "expr": "0 9 * * *", "tz": "Asia/Shanghai" },
            "sessionTarget": "main",
            "wakeMode": "now",
            "payload": { "kind": "systemEvent", "text": "该检查邮件了" },
            "state": { "nextRunAtMs": 1711936800000 }
        }
    ]
}

8.2 原子写入

typescript
// store.ts
async function saveCronStore(storePath: string, store: CronStoreFile) {
    const json = JSON.stringify(store, null, 2);
    
    // 1. 检查是否有变更(内存缓存比对)
    if (cached === json) return;
    
    // 2. 写入临时文件
    const tmp = `${storePath}.${process.pid}.${randomBytes(8).hex()}.tmp`;
    await writeFile(tmp, json);
    
    // 3. 备份现有文件
    await copyFile(storePath, `${storePath}.bak`);
    
    // 4. 原子重命名
    await renameWithRetry(tmp, storePath);
    
    // 5. 更新缓存
    serializedStoreCache.set(storePath, json);
}

renameWithRetry 处理了 Windows 平台的 EBUSY/EPERM 问题(Windows 不支持原子替换已存在的文件)。

8.3 序列化锁

typescript
// service/locked.ts
export async function locked<T>(state, fn: () => Promise<T>): Promise<T> {
    // 序列化所有对 store 的操作
    // 避免并发读写导致数据丢失
    state.op = state.op.then(fn, fn);
    return state.op;
}

使用 Promise 链实现串行化锁,所有对 Store 的读写操作都通过 locked() 包装,确保不会有两个操作同时修改同一个文件。

![持久化存储与序列化锁](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/10-infographic-persistence-1775150822476.png)


九、会话清理器(Session Reaper)

Isolated 模式的 Cron Job 每次执行都会创建一个新会话(cron:{jobId}:run:{uuid}),长期运行后会积累大量过期会话。

9.1 清理策略

onTimer() → finally:
    └── sweepCronRunSessions()

         ├── 节流检查:上次清理距今 < 5 分钟? → 跳过

         ├── 读取会话存储

         ├── 遍历所有会话 key
         │   ├── 匹配 cron run 格式? (cron:{id}:run:{uuid})
         │   └── updatedAt < cutoff (now - retentionMs)?
         │       └── 删除会话记录

         ├── 归档已删除会话的转录文件

         └── 清理过期的归档文件

默认保留期为 24 小时,可通过 cron.sessionRetention 配置(支持人类可读格式如 "48h""7d")。设为 false 可禁用清理。

![会话清理器流程](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/11-infographic-session-reaper-1775150823466.png)


十、输入归一化

10.1 兼容性处理

系统需要兼容多个历史版本的 API 格式:

历史格式当前格式转换
schedule.atMs: 1711936800000schedule.at: "2025-04-01T01:00:00.000Z"数字→ISO 字符串
schedule.cron: "0 9 * * *"schedule.expr: "0 9 * * *"字段重命名
payload.deliver: truedelivery: { mode: "announce" }从 payload 提取到独立 delivery
payload.provider: "telegram"delivery.channel: "telegram"provider → channel

10.2 Flat-Params 恢复

某些 AI 模型(如 Grok)不擅长构建嵌套 JSON,会把 Job 属性扁平化到顶层:

json
// AI 输出的扁平格式
{
    "action": "add",
    "name": "Daily check",
    "kind": "cron",
    "expr": "0 9 * * *",
    "message": "Check emails"
}

// 归一化后
{
    "action": "add",
    "name": "Daily check",
    "schedule": { "kind": "cron", "expr": "0 9 * * *" },
    "payload": { "kind": "agentTurn", "message": "Check emails" },
    "sessionTarget": "isolated",
    "delivery": { "mode": "announce" }
}

normalize.ts 中有大量的兼容性代码,确保无论 AI 输出什么格式,都能正确解析。

![输入归一化与格式转换](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/12-infographic-normalization-1775150824357.png)


十一、Agent 工具层

11.1 工具入口

createCronTool() 创建一个名为 "cron" 的 Agent 工具,标记为 ownerOnly: true(只有 owner 可以创建定时任务)。

支持 7 种 action:

Action功能Gateway RPC
status查看调度器状态cron.status
list列出所有任务cron.list
add创建新任务cron.add
update修改任务cron.update
remove删除任务cron.remove
run立即执行任务cron.run
runs查看执行历史cron.runs
wake唤醒助手wake

11.2 上下文消息注入

typescript
// cron-tool.ts
if (input.contextMessages) {
    const MAX_CONTEXT_MESSAGES = 20;
    // 从最近的聊天记录中截取
    // 附加到提醒文本中
    // 让 AI 在回复提醒时能参考上下文
}

当 AI 设置提醒时,可以附带当前对话的上下文。这样到期触发时,AI 能回忆起"提醒是在什么场景下设置的"。

11.3 Delivery 推断

typescript
// cron-tool.ts — 自动推断投递配置
function inferDelivery(input, sessionContext) {
    // 1. 如果用户明确指定了 delivery → 使用指定的
    // 2. 如果在 Telegram forum topic 中设置 → 自动路由到该 topic
    // 3. 如果在 threaded session 中设置 → 自动路由到该线程
    // 4. 默认 → { mode: "announce", channel: "last" }
}

十二、主会话心跳投递策略

12.1 何时向主会话发送摘要?

Isolated 模式的 Cron Job 执行完毕后,系统需要决定是否向主会话发送一条摘要消息。

typescript
// heartbeat-policy.ts
function shouldEnqueueCronMainSummary(params) {
    // 不发送的情况:
    // 1. 没有摘要文本
    // 2. 投递已请求且成功 → 用户已在目标频道看到了
    // 3. 投递已尝试(即使状态不确定)→ 避免重复
    // 4. 摘要内容是心跳 OK token → 内部标记,不该泄露
    // 5. suppressMainSummary=true → 投递目标错误导致的失败
    
    // 发送的情况:
    // 1. 未请求投递 → 结果只在主会话可见
    // 2. 投递失败且 deliveryAttempted=false → 用户没看到
}

十三、Gateway 重启恢复

13.1 错过的任务

如果 Gateway 重启,某些 Job 可能在停机期间到期。runMissedJobs() 在启动时检查并执行这些任务:

typescript
export async function runMissedJobs(state, opts?) {
    // 1. 加载 store
    // 2. 找出所有到期但未执行的 Job(collectRunnableJobs)
    //    特殊处理:一次性 Job 已执行过的不再补跑
    // 3. 标记 runningAtMs → persist
    // 4. 逐个执行
    // 5. 应用结果 → persist
}

13.2 nextRunAtMs 保守策略

在重启恢复时,系统不会重新计算已有的 nextRunAtMs(除非为 undefined)。这防止了:

  • 每日任务跳过一天(因为重新计算指向明天)
  • 间隔任务跳过中间的周期
typescript
export function recomputeNextRuns(state): boolean {
    return walkSchedulableJobs(state, ({ job, nowMs: now }) => {
        const nextRun = job.state.nextRunAtMs;
        // 只有缺失或已过期才重算
        const isDueOrMissing = !isFiniteTimestamp(nextRun) || now >= nextRun;
        if (isDueOrMissing) {
            recomputeJobNextRunAtMs({ state, job, nowMs: now });
        }
    });
}

十四、配置参考

json5
{
    "cron": {
        "enabled": true,
        "storePath": "~/.openclaw/cron/jobs.json",
        "maxConcurrentRuns": 1,
        "sessionRetention": "24h",     // 或 false 禁用清理
        "retry": {
            "maxAttempts": 3,           // 一次性任务最大重试次数
            "backoffMs": [30000, 60000, 300000],
            "retryOn": ["rate_limit", "network", "timeout", "server_error"]
        },
        "failureAlert": {
            "enabled": false,
            "after": 2,                 // 连续失败 N 次后告警
            "cooldownMs": 3600000,      // 告警冷却期(1 小时)
            "channel": "last",
            "mode": "announce"          // "webhook"
        }
    }
}

十五、与其他模块的交互

┌─── Agent 系统 ───┐
│                   │
│  Pi Agent Core    │──→ cron-tool.ts ──→ 7 种 action
│                   │
│  工具注册         │──→ createCronTool() ──→ ToolDefinition
└───────────────────┘


┌─── Gateway ───────┐
│                    │
│  RPC 处理          │──→ cron.add/list/update/remove/run/status
│                    │
│  心跳系统          │──→ requestHeartbeatNow() / runHeartbeatOnce()
│                    │
│  系统事件          │──→ enqueueSystemEvent() → 注入会话
└────────────────────┘


┌─── 会话系统 ──────┐
│                    │
│  会话存储          │──→ 创建/管理 cron run 会话
│  (sessions.json)   │
│                    │──→ sweepCronRunSessions() 清理
│  会话转录          │──→ 归档/删除过期的 JSONL
└────────────────────┘


┌─── Channel 系统 ──┐
│                    │
│  消息投递          │──→ announce: 发送到聊天频道
│                    │──→ webhook: HTTP POST
│  失败通知          │──→ sendCronFailureAlert()
└────────────────────┘

![模块交互关系](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/13-infographic-module-interactions-1775150825291.png)


十六、数据流全景图

AI 说:"每天早上 9 点提醒我查看邮件"


cron-tool.ts: action="add"

        ├── normalizeCronJobCreate() → 归一化输入
        │   schedule: { kind: "cron", expr: "0 9 * * *", tz: "Asia/Shanghai" }
        │   payload: { kind: "systemEvent", text: "该检查邮件了" }
        │   sessionTarget: "main"
        │   wakeMode: "now"


Gateway RPC: cron.add → ops.ts: add()

        ├── locked:
        │   ├── ensureLoaded() → 加载 jobs.json
        │   ├── createJob() → UUID + computeJobNextRunAtMs()
        │   ├── push to store.jobs
        │   └── persist() → 原子写入 jobs.json

        ├── emit: { action: "added", jobId: "..." }

        └── armTimer() → setTimeout(onTimer, delay)

──── 次日早上 9 点 ────

setTimeout 触发 → onTimer()

        ├── findDueJobs() → 找到 "每天 9 点" 这个 Job

        ├── executeJobCoreWithTimeout()
        │   │
        │   └── executeJobCore()
        │       │
        │       ├── sessionTarget = "main"
        │       ├── enqueueSystemEvent("该检查邮件了")
        │       └── runHeartbeatOnce() → 同步等待心跳
        │           │
        │           └── AI 在最后活跃频道回复 "好的,提醒你该检查邮件了!"

        ├── applyJobResult()
        │   ├── status = "ok"
        │   ├── consecutiveErrors = 0
        │   └── nextRunAtMs = 明天 9 点

        ├── persist() → 更新 jobs.json

        └── armTimer() → setTimeout 到明天 9 点

十七、关键设计决策与权衡

17.1 为什么用 JSON 文件而非数据库?

优势

  • 人类可读可编辑(JSON5)
  • 零依赖(不需要 SQLite)
  • 易于备份和迁移(单文件复制)
  • 个人助手场景下任务数量少(通常 < 100 个)

权衡

  • 每次修改需要全量读写(通过序列化缓存优化)
  • 并发安全依赖串行化锁(Promise 链)

17.2 为什么分 main/isolated 两种模式?

mainisolated
适用场景简单提醒复杂任务
执行方式注入主会话独立会话
上下文共享主会话上下文干净的上下文
工具访问完全权限受限
投递心跳驱动主动投递
开销极低创建新会话

main 模式适合"提醒我做 X",isolated 模式适合"帮我做 X 然后告诉我结果"。

17.3 错峰执行的意义

当用户设置多个整点 Job 时(如"每小时检查一次 API"、"每小时汇总一次日志"),全部在 :00 触发会:

  1. 并发请求 → API 限流
  2. Agent 排队 → 延迟增大
  3. 投递拥塞 → 消息乱序

5 分钟的错峰窗口(基于 JobID 哈希的确定性偏移)在保持"大约每小时一次"的同时,显著降低了竞争。


十八、总结

OpenClaw 的定时任务系统是一个精密且容错的调度引擎,有以下亮点:

  1. 三种调度类型:一次性(at)、固定间隔(every)、Cron 表达式(cron),覆盖所有定时场景
  2. 双执行模式:main(轻量提醒)和 isolated(复杂任务),按需选择
  3. 完善的容错:指数退避、瞬态错误自动重试、卡住检测、连续失败告警、Schedule 错误自动禁用
  4. 多种投递方式:announce(聊天频道)、webhook(HTTP POST)、失败目标路由
  5. 错峰执行:基于 JobID 哈希的确定性偏移,防止整点拥塞
  6. 重启恢复:错过的任务自动补跑,nextRunAtMs 保守策略
  7. 会话清理:自动清理过期的 cron run 会话和转录文件
  8. 极强的兼容性:Flat-params 恢复、历史格式迁移、多种 AI 模型输出适配
  9. 安全设计:ownerOnly 权限、AbortController 超时、序列化锁

整个设计遵循"可靠且低运维"的理念:用户只需要告诉 AI "每天 9 点提醒我",其余的调度、执行、投递、容错、清理全部自动化。

![定时任务系统 9 大设计亮点](https://qn.huat.xyz/blog/article-Illustration/OpenClaw 源码解读(十)定时任务系统/14-infographic-design-highlights-1775150826102.png)

读文档、看源码、写代码,理解 AI Agent 本质 🤖