Skip to content

实战项目四:多代理协作工作流

在前面三个实战项目中,我们分别探索了代码助手、研究助理和安全执行平台。现在,让我们挑战系列教程的终极项目——构建一个完整的多代理协作系统,模拟真实的软件开发团队。

项目愿景

想象一个AI驱动的软件开发团队:产品经理分析需求、架构师设计方案、开发者实现代码、审查者把关质量、测试员验证功能。每个角色各司其职,又紧密协作。这就是我们要构建的系统。

需求输入 → PM分析 → 架构设计 → 开发实现 → 代码审查 → 测试验证 → 交付

AI软件开发团队工作流管线

架构设计

团队角色定义

typescript
import {
  createDeepAgent,
  SubAgent,
  CompiledSubAgent,
  StateBackend,
  StoreBackend,
  FilesystemBackend,
  LocalShellBackend,
  CompositeBackend,
} from "@langchain/langgraph-agents";
import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { MemorySaver } from "@langchain/langgraph";

interface TeamMember {
  name: string;
  role: string;
  responsibilities: string[];
}

const teamStructure: TeamMember[] = [
  {
    name: "product_manager",
    role: "产品经理",
    responsibilities: ["需求分析", "任务拆解", "优先级排序", "验收标准制定"],
  },
  {
    name: "architect",
    role: "架构师",
    responsibilities: ["技术选型", "系统设计", "接口定义", "风险评估"],
  },
  {
    name: "developer",
    role: "开发者",
    responsibilities: ["代码实现", "单元测试", "文档编写", "Bug修复"],
  },
  {
    name: "reviewer",
    role: "代码审查者",
    responsibilities: ["代码审查", "最佳实践检查", "安全漏洞检测", "性能分析"],
  },
  {
    name: "tester",
    role: "测试工程师",
    responsibilities: ["测试用例设计", "集成测试", "回归测试", "测试报告"],
  },
];

五大团队角色与职责矩阵

工作流状态定义

typescript
interface WorkflowState {
  projectName: string;
  requirement: string;
  currentPhase: WorkflowPhase;
  artifacts: WorkflowArtifacts;
  history: PhaseResult[];
  status: "in_progress" | "blocked" | "completed" | "failed";
}

type WorkflowPhase =
  | "requirement_analysis"
  | "architecture_design"
  | "development"
  | "code_review"
  | "testing"
  | "delivery";

interface WorkflowArtifacts {
  requirementDoc?: RequirementDocument;
  designDoc?: DesignDocument;
  codeFiles?: CodeFile[];
  reviewReport?: ReviewReport;
  testReport?: TestReport;
}

interface RequirementDocument {
  title: string;
  description: string;
  userStories: UserStory[];
  acceptanceCriteria: string[];
  priority: "high" | "medium" | "low";
  estimatedEffort: string;
}

interface UserStory {
  id: string;
  asA: string;
  iWant: string;
  soThat: string;
  acceptanceCriteria: string[];
}

interface DesignDocument {
  overview: string;
  techStack: string[];
  architecture: string;
  components: ComponentDesign[];
  interfaces: InterfaceDefinition[];
  dataModels: DataModel[];
  risks: Risk[];
}

interface ComponentDesign {
  name: string;
  responsibility: string;
  dependencies: string[];
  interfaces: string[];
}

interface InterfaceDefinition {
  name: string;
  method: string;
  endpoint?: string;
  parameters: ParameterDef[];
  response: string;
}

interface ParameterDef {
  name: string;
  type: string;
  required: boolean;
  description: string;
}

interface DataModel {
  name: string;
  fields: FieldDef[];
  relationships: string[];
}

interface FieldDef {
  name: string;
  type: string;
  constraints: string[];
}

interface Risk {
  description: string;
  impact: "high" | "medium" | "low";
  mitigation: string;
}

interface CodeFile {
  path: string;
  content: string;
  language: string;
  tests?: string;
}

interface ReviewReport {
  status: "approved" | "changes_requested" | "rejected";
  summary: string;
  issues: ReviewIssue[];
  suggestions: string[];
  securityConcerns: string[];
}

interface ReviewIssue {
  severity: "critical" | "major" | "minor" | "suggestion";
  file: string;
  line?: number;
  description: string;
  suggestion: string;
}

interface TestReport {
  status: "passed" | "failed" | "partial";
  summary: string;
  testCases: TestCase[];
  coverage: number;
  performanceMetrics?: PerformanceMetric[];
}

interface TestCase {
  name: string;
  status: "passed" | "failed" | "skipped";
  duration: number;
  error?: string;
}

interface PerformanceMetric {
  name: string;
  value: number;
  unit: string;
  threshold?: number;
}

interface PhaseResult {
  phase: WorkflowPhase;
  agent: string;
  startTime: Date;
  endTime: Date;
  status: "success" | "failed" | "blocked";
  output: string;
  artifacts: Partial<WorkflowArtifacts>;
}

团队成员实现

SubAgent代理解剖图

1. 产品经理代理

typescript
const requirementAnalysisTool = tool(
  async ({ requirement, context }) => {
    return JSON.stringify({
      status: "analyzed",
      message: "需求分析完成,已生成需求文档",
    });
  },
  {
    name: "analyze_requirement",
    description: "分析用户需求,生成结构化的需求文档",
    schema: z.object({
      requirement: z.string().describe("原始需求描述"),
      context: z.string().optional().describe("项目上下文信息"),
    }),
  }
);

const prioritizeTool = tool(
  async ({ items, criteria }) => {
    return JSON.stringify({
      status: "prioritized",
      order: items,
    });
  },
  {
    name: "prioritize_tasks",
    description: "根据业务价值和技术依赖对任务进行优先级排序",
    schema: z.object({
      items: z.array(z.string()).describe("待排序的任务列表"),
      criteria: z.string().describe("排序标准"),
    }),
  }
);

const productManagerAgent: SubAgent = {
  name: "product_manager",
  description: `产品经理代理,负责:
    - 分析和理解用户需求
    - 编写用户故事和验收标准
    - 任务拆解和优先级排序
    - 制定项目里程碑`,
  systemPrompt: `你是一位经验丰富的产品经理。你的职责是:

1. **需求分析**:
   - 深入理解用户需求的本质
   - 识别关键功能和非功能需求
   - 发现潜在的需求冲突或遗漏

2. **用户故事编写**:
   - 使用标准格式:作为[角色],我想要[功能],以便[价值]
   - 每个故事都要有明确的验收标准
   - 保持故事的独立性和可测试性

3. **任务拆解**:
   - 将大需求拆解为可执行的小任务
   - 识别任务间的依赖关系
   - 估算工作量

4. **输出格式**:
   始终以结构化的JSON格式输出需求文档,包含:
   - title: 项目标题
   - description: 项目描述
   - userStories: 用户故事数组
   - acceptanceCriteria: 整体验收标准
   - priority: 优先级
   - estimatedEffort: 预估工作量`,
  tools: [requirementAnalysisTool, prioritizeTool],
};

2. 架构师代理

typescript
const designSystemTool = tool(
  async ({ requirements, constraints }) => {
    return JSON.stringify({
      status: "designed",
      message: "系统架构设计完成",
    });
  },
  {
    name: "design_system",
    description: "根据需求设计系统架构",
    schema: z.object({
      requirements: z.string().describe("需求文档"),
      constraints: z.string().optional().describe("技术约束"),
    }),
  }
);

const defineInterfaceTool = tool(
  async ({ componentName, interfaces }) => {
    return JSON.stringify({
      status: "defined",
      interfaces: interfaces,
    });
  },
  {
    name: "define_interface",
    description: "定义组件接口",
    schema: z.object({
      componentName: z.string().describe("组件名称"),
      interfaces: z.array(z.string()).describe("接口定义"),
    }),
  }
);

const assessRiskTool = tool(
  async ({ designDoc }) => {
    return JSON.stringify({
      status: "assessed",
      risks: [],
    });
  },
  {
    name: "assess_risks",
    description: "评估设计方案的技术风险",
    schema: z.object({
      designDoc: z.string().describe("设计文档"),
    }),
  }
);

const architectAgent: SubAgent = {
  name: "architect",
  description: `架构师代理,负责:
    - 技术选型和架构设计
    - 组件划分和接口定义
    - 数据模型设计
    - 技术风险评估`,
  systemPrompt: `你是一位资深软件架构师。你的职责是:

1. **技术选型**:
   - 根据项目需求选择合适的技术栈
   - 考虑团队技术能力和学习曲线
   - 评估技术的成熟度和社区支持

2. **架构设计**:
   - 设计清晰的系统架构
   - 定义组件边界和职责
   - 确保高内聚低耦合

3. **接口定义**:
   - 定义清晰的API接口
   - 使用RESTful或GraphQL规范
   - 考虑版本控制和向后兼容

4. **数据模型**:
   - 设计合理的数据结构
   - 定义实体关系
   - 考虑数据一致性和完整性

5. **输出格式**:
   始终以结构化的JSON格式输出设计文档,包含:
   - overview: 设计概述
   - techStack: 技术栈
   - architecture: 架构描述
   - components: 组件设计
   - interfaces: 接口定义
   - dataModels: 数据模型
   - risks: 风险评估`,
  tools: [designSystemTool, defineInterfaceTool, assessRiskTool],
};

3. 开发者代理

typescript
const writeCodeTool = tool(
  async ({ specification, language }) => {
    return JSON.stringify({
      status: "written",
      message: "代码编写完成",
    });
  },
  {
    name: "write_code",
    description: "根据规范编写代码",
    schema: z.object({
      specification: z.string().describe("代码规范"),
      language: z.string().describe("编程语言"),
    }),
  }
);

const writeTestTool = tool(
  async ({ codeFile, testFramework }) => {
    return JSON.stringify({
      status: "written",
      message: "测试代码编写完成",
    });
  },
  {
    name: "write_tests",
    description: "为代码编写单元测试",
    schema: z.object({
      codeFile: z.string().describe("源代码文件"),
      testFramework: z.string().describe("测试框架"),
    }),
  }
);

const developerAgent: SubAgent = {
  name: "developer",
  description: `开发者代理,负责:
    - 根据设计文档实现代码
    - 编写单元测试
    - 处理代码审查反馈
    - 修复Bug`,
  systemPrompt: `你是一位全栈开发工程师。你的职责是:

1. **代码实现**:
   - 严格按照设计文档实现功能
   - 遵循编码规范和最佳实践
   - 编写清晰的代码注释
   - 处理边界情况和错误

2. **单元测试**:
   - 为每个功能编写单元测试
   - 覆盖正常路径和边界情况
   - 使用Mock隔离依赖

3. **代码质量**:
   - 保持代码简洁易读
   - 避免重复代码
   - 使用有意义的命名

4. **输出格式**:
   - 每个文件都要包含完整的代码
   - 使用标准的项目结构
   - 包含必要的配置文件

示例输出:
\`\`\`typescript
// src/services/user.service.ts
export class UserService {
  async createUser(data: CreateUserDto): Promise<User> {
    // 实现代码
  }
}
\`\`\``,
  tools: [writeCodeTool, writeTestTool],
  backend: (rt) =>
    new CompositeBackend(new StateBackend(rt), {
      "/workspace/": new FilesystemBackend(rt, {
        workingDirectory: "/tmp/dev-workspace",
      }),
    }),
};

4. 代码审查者代理

typescript
const reviewCodeTool = tool(
  async ({ code, standards }) => {
    return JSON.stringify({
      status: "reviewed",
      issues: [],
    });
  },
  {
    name: "review_code",
    description: "审查代码质量",
    schema: z.object({
      code: z.string().describe("待审查的代码"),
      standards: z.string().optional().describe("审查标准"),
    }),
  }
);

const checkSecurityTool = tool(
  async ({ code }) => {
    return JSON.stringify({
      status: "checked",
      vulnerabilities: [],
    });
  },
  {
    name: "check_security",
    description: "检查代码安全漏洞",
    schema: z.object({
      code: z.string().describe("待检查的代码"),
    }),
  }
);

const analyzePerformanceTool = tool(
  async ({ code }) => {
    return JSON.stringify({
      status: "analyzed",
      suggestions: [],
    });
  },
  {
    name: "analyze_performance",
    description: "分析代码性能问题",
    schema: z.object({
      code: z.string().describe("待分析的代码"),
    }),
  }
);

const reviewerAgent: SubAgent = {
  name: "reviewer",
  description: `代码审查者代理,负责:
    - 代码质量审查
    - 安全漏洞检测
    - 性能问题分析
    - 最佳实践建议`,
  systemPrompt: `你是一位严谨的代码审查专家。你的职责是:

1. **代码质量审查**:
   - 检查代码是否符合设计规范
   - 验证逻辑正确性
   - 评估代码可读性和可维护性
   - 检查错误处理是否完善

2. **安全审查**:
   - 检查常见安全漏洞(SQL注入、XSS等)
   - 验证输入验证和输出编码
   - 检查敏感数据处理
   - 评估认证和授权逻辑

3. **性能审查**:
   - 识别潜在的性能瓶颈
   - 检查N+1查询问题
   - 评估内存使用
   - 检查异步操作处理

4. **输出格式**:
   始终以结构化的JSON格式输出审查报告,包含:
   - status: "approved" | "changes_requested" | "rejected"
   - summary: 审查总结
   - issues: 问题列表(包含severity, file, line, description, suggestion)
   - suggestions: 改进建议
   - securityConcerns: 安全问题`,
  tools: [reviewCodeTool, checkSecurityTool, analyzePerformanceTool],
};

5. 测试工程师代理

typescript
const runTestsTool = tool(
  async ({ testFiles, config }) => {
    return JSON.stringify({
      status: "completed",
      results: [],
    });
  },
  {
    name: "run_tests",
    description: "执行测试用例",
    schema: z.object({
      testFiles: z.array(z.string()).describe("测试文件列表"),
      config: z.string().optional().describe("测试配置"),
    }),
  }
);

const generateTestCasesTool = tool(
  async ({ requirements, code }) => {
    return JSON.stringify({
      status: "generated",
      testCases: [],
    });
  },
  {
    name: "generate_test_cases",
    description: "根据需求生成测试用例",
    schema: z.object({
      requirements: z.string().describe("需求文档"),
      code: z.string().describe("实现代码"),
    }),
  }
);

const measureCoverageTool = tool(
  async ({ testResults }) => {
    return JSON.stringify({
      coverage: 0,
      uncoveredLines: [],
    });
  },
  {
    name: "measure_coverage",
    description: "测量测试覆盖率",
    schema: z.object({
      testResults: z.string().describe("测试结果"),
    }),
  }
);

const testerAgent: SubAgent = {
  name: "tester",
  description: `测试工程师代理,负责:
    - 设计测试用例
    - 执行集成测试
    - 回归测试
    - 生成测试报告`,
  systemPrompt: `你是一位专业的测试工程师。你的职责是:

1. **测试用例设计**:
   - 根据需求设计测试用例
   - 覆盖正常流程和异常流程
   - 设计边界条件测试
   - 包含性能测试场景

2. **测试执行**:
   - 执行单元测试和集成测试
   - 验证功能是否符合需求
   - 检查回归问题
   - 进行探索性测试

3. **测试报告**:
   - 记录测试结果
   - 分析失败原因
   - 计算测试覆盖率
   - 提供质量评估

4. **输出格式**:
   始终以结构化的JSON格式输出测试报告,包含:
   - status: "passed" | "failed" | "partial"
   - summary: 测试总结
   - testCases: 测试用例结果
   - coverage: 覆盖率
   - performanceMetrics: 性能指标`,
  tools: [runTestsTool, generateTestCasesTool, measureCoverageTool],
  backend: (rt) =>
    new CompositeBackend(new StateBackend(rt), {
      "/test-results/": new StoreBackend(rt, {
        namespace: ["test", "results"],
      }),
    }),
};

工作流协调器

主代理定义

typescript
const workflowCoordinatorTool = tool(
  async ({ phase, input }) => {
    return JSON.stringify({
      status: "coordinated",
      nextPhase: phase,
    });
  },
  {
    name: "coordinate_workflow",
    description: "协调工作流阶段转换",
    schema: z.object({
      phase: z.enum([
        "requirement_analysis",
        "architecture_design",
        "development",
        "code_review",
        "testing",
        "delivery",
      ]),
      input: z.string().describe("阶段输入"),
    }),
  }
);

const escalateIssueTool = tool(
  async ({ issue, severity }) => {
    return JSON.stringify({
      status: "escalated",
      action: "human_review",
    });
  },
  {
    name: "escalate_issue",
    description: "升级问题到人工处理",
    schema: z.object({
      issue: z.string().describe("问题描述"),
      severity: z.enum(["critical", "high", "medium", "low"]),
    }),
  }
);

const generateReportTool = tool(
  async ({ workflowState }) => {
    return JSON.stringify({
      status: "generated",
      report: {},
    });
  },
  {
    name: "generate_report",
    description: "生成工作流报告",
    schema: z.object({
      workflowState: z.string().describe("工作流状态"),
    }),
  }
);

const createTeamCoordinator = () => {
  const checkpointer = new MemorySaver();

  return createDeepAgent({
    model: new ChatOpenAI({
      modelName: "gpt-4",
      temperature: 0,
    }),
    systemPrompt: `你是软件开发团队的项目协调员。你负责:

1. **工作流管理**:
   - 协调各个开发阶段的执行顺序
   - 确保每个阶段的输出满足下一阶段的输入要求
   - 处理阶段间的依赖关系

2. **团队协调**:
   - 将任务分配给合适的团队成员
   - 收集和整合各成员的输出
   - 解决团队间的冲突

3. **质量把控**:
   - 确保每个阶段的交付物符合标准
   - 在关键节点进行审批
   - 处理异常情况

4. **工作流程**:
   需求输入 → 产品经理分析 → 架构师设计 → 开发者实现 → 审查者审查 → 测试员测试 → 交付

5. **决策规则**:
   - 如果代码审查不通过,返回开发阶段
   - 如果测试不通过,根据问题性质决定返回开发或审查阶段
   - 关键决策需要人工确认

始终输出清晰的状态更新和下一步行动计划。`,

    tools: [workflowCoordinatorTool, escalateIssueTool, generateReportTool],

    subagents: [
      productManagerAgent,
      architectAgent,
      developerAgent,
      reviewerAgent,
      testerAgent,
    ],

    backend: (rt) =>
      new CompositeBackend(new StateBackend(rt), {
        "/project/": new FilesystemBackend(rt, {
          workingDirectory: "/tmp/project-workspace",
        }),
        "/memories/": new StoreBackend(rt, {
          namespace: ["workflow", "history"],
        }),
      }),

    interruptOn: [
      {
        toolName: "task",
        condition: (toolCall) => {
          const args = toolCall.args as { name: string };
          return args.name === "developer";
        },
      },
      {
        toolName: "escalate_issue",
      },
    ],

    checkpointer,
  });
};

工作流协调器中枢架构

工作流执行引擎

阶段执行器

阶段执行器模式与工作流引擎

typescript
interface PhaseExecutor {
  execute(input: PhaseInput): Promise<PhaseOutput>;
}

interface PhaseInput {
  workflowState: WorkflowState;
  previousOutput?: PhaseResult;
}

interface PhaseOutput {
  status: "success" | "failed" | "blocked";
  artifacts: Partial<WorkflowArtifacts>;
  message: string;
  nextPhase?: WorkflowPhase;
}

class RequirementPhaseExecutor implements PhaseExecutor {
  constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}

  async execute(input: PhaseInput): Promise<PhaseOutput> {
    const result = await this.coordinator.invoke({
      messages: [
        {
          role: "user",
          content: `请让产品经理分析以下需求,并生成需求文档:

项目名称:${input.workflowState.projectName}
原始需求:${input.workflowState.requirement}

要求:
1. 生成结构化的需求文档
2. 包含用户故事和验收标准
3. 进行任务拆解和优先级排序`,
        },
      ],
    });

    const requirementDoc = this.parseRequirementDoc(result);

    return {
      status: "success",
      artifacts: { requirementDoc },
      message: "需求分析完成",
      nextPhase: "architecture_design",
    };
  }

  private parseRequirementDoc(result: any): RequirementDocument {
    return {
      title: "",
      description: "",
      userStories: [],
      acceptanceCriteria: [],
      priority: "medium",
      estimatedEffort: "",
    };
  }
}

class DesignPhaseExecutor implements PhaseExecutor {
  constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}

  async execute(input: PhaseInput): Promise<PhaseOutput> {
    const requirementDoc = input.workflowState.artifacts.requirementDoc;

    const result = await this.coordinator.invoke({
      messages: [
        {
          role: "user",
          content: `请让架构师根据以下需求设计系统架构:

需求文档:
${JSON.stringify(requirementDoc, null, 2)}

要求:
1. 选择合适的技术栈
2. 设计系统架构和组件
3. 定义接口和数据模型
4. 评估技术风险`,
        },
      ],
    });

    const designDoc = this.parseDesignDoc(result);

    return {
      status: "success",
      artifacts: { designDoc },
      message: "架构设计完成",
      nextPhase: "development",
    };
  }

  private parseDesignDoc(result: any): DesignDocument {
    return {
      overview: "",
      techStack: [],
      architecture: "",
      components: [],
      interfaces: [],
      dataModels: [],
      risks: [],
    };
  }
}

class DevelopmentPhaseExecutor implements PhaseExecutor {
  constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}

  async execute(input: PhaseInput): Promise<PhaseOutput> {
    const { requirementDoc, designDoc } = input.workflowState.artifacts;

    const result = await this.coordinator.invoke({
      messages: [
        {
          role: "user",
          content: `请让开发者根据设计文档实现代码:

需求文档:
${JSON.stringify(requirementDoc, null, 2)}

设计文档:
${JSON.stringify(designDoc, null, 2)}

要求:
1. 严格按照设计文档实现
2. 编写单元测试
3. 遵循编码规范`,
        },
      ],
    });

    const codeFiles = this.parseCodeFiles(result);

    return {
      status: "success",
      artifacts: { codeFiles },
      message: "代码开发完成",
      nextPhase: "code_review",
    };
  }

  private parseCodeFiles(result: any): CodeFile[] {
    return [];
  }
}

class ReviewPhaseExecutor implements PhaseExecutor {
  constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}

  async execute(input: PhaseInput): Promise<PhaseOutput> {
    const { codeFiles, designDoc } = input.workflowState.artifacts;

    const result = await this.coordinator.invoke({
      messages: [
        {
          role: "user",
          content: `请让代码审查者审查以下代码:

代码文件:
${JSON.stringify(codeFiles, null, 2)}

设计文档:
${JSON.stringify(designDoc, null, 2)}

审查要点:
1. 代码质量和规范
2. 安全漏洞检测
3. 性能问题分析
4. 是否符合设计规范`,
        },
      ],
    });

    const reviewReport = this.parseReviewReport(result);

    if (reviewReport.status === "rejected") {
      return {
        status: "blocked",
        artifacts: { reviewReport },
        message: "代码审查未通过,需要修改",
        nextPhase: "development",
      };
    }

    return {
      status: "success",
      artifacts: { reviewReport },
      message:
        reviewReport.status === "approved" ? "代码审查通过" : "代码需要修改",
      nextPhase:
        reviewReport.status === "approved" ? "testing" : "development",
    };
  }

  private parseReviewReport(result: any): ReviewReport {
    return {
      status: "approved",
      summary: "",
      issues: [],
      suggestions: [],
      securityConcerns: [],
    };
  }
}

class TestingPhaseExecutor implements PhaseExecutor {
  constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}

  async execute(input: PhaseInput): Promise<PhaseOutput> {
    const { codeFiles, requirementDoc } = input.workflowState.artifacts;

    const result = await this.coordinator.invoke({
      messages: [
        {
          role: "user",
          content: `请让测试工程师执行测试:

代码文件:
${JSON.stringify(codeFiles, null, 2)}

需求文档:
${JSON.stringify(requirementDoc, null, 2)}

测试要求:
1. 设计测试用例
2. 执行功能测试
3. 测量代码覆盖率
4. 生成测试报告`,
        },
      ],
    });

    const testReport = this.parseTestReport(result);

    if (testReport.status === "failed") {
      return {
        status: "blocked",
        artifacts: { testReport },
        message: "测试未通过",
        nextPhase: "development",
      };
    }

    return {
      status: "success",
      artifacts: { testReport },
      message: "测试通过",
      nextPhase: "delivery",
    };
  }

  private parseTestReport(result: any): TestReport {
    return {
      status: "passed",
      summary: "",
      testCases: [],
      coverage: 0,
    };
  }
}

工作流引擎

typescript
class WorkflowEngine {
  private executors: Map<WorkflowPhase, PhaseExecutor>;
  private coordinator: ReturnType<typeof createTeamCoordinator>;

  constructor() {
    this.coordinator = createTeamCoordinator();
    this.executors = new Map([
      ["requirement_analysis", new RequirementPhaseExecutor(this.coordinator)],
      ["architecture_design", new DesignPhaseExecutor(this.coordinator)],
      ["development", new DevelopmentPhaseExecutor(this.coordinator)],
      ["code_review", new ReviewPhaseExecutor(this.coordinator)],
      ["testing", new TestingPhaseExecutor(this.coordinator)],
    ]);
  }

  async run(
    projectName: string,
    requirement: string
  ): Promise<WorkflowState> {
    const state: WorkflowState = {
      projectName,
      requirement,
      currentPhase: "requirement_analysis",
      artifacts: {},
      history: [],
      status: "in_progress",
    };

    const phaseOrder: WorkflowPhase[] = [
      "requirement_analysis",
      "architecture_design",
      "development",
      "code_review",
      "testing",
      "delivery",
    ];

    let currentIndex = 0;
    let maxIterations = 10;
    let iterations = 0;

    while (
      state.currentPhase !== "delivery" &&
      iterations < maxIterations
    ) {
      iterations++;
      const executor = this.executors.get(state.currentPhase);

      if (!executor) {
        throw new Error(`No executor for phase: ${state.currentPhase}`);
      }

      console.log(`\n${"=".repeat(60)}`);
      console.log(`执行阶段: ${state.currentPhase}`);
      console.log(`${"=".repeat(60)}\n`);

      const startTime = new Date();
      const output = await executor.execute({
        workflowState: state,
        previousOutput: state.history[state.history.length - 1],
      });
      const endTime = new Date();

      state.artifacts = { ...state.artifacts, ...output.artifacts };
      state.history.push({
        phase: state.currentPhase,
        agent: this.getAgentForPhase(state.currentPhase),
        startTime,
        endTime,
        status: output.status,
        output: output.message,
        artifacts: output.artifacts,
      });

      if (output.status === "failed") {
        state.status = "failed";
        break;
      }

      if (output.nextPhase) {
        state.currentPhase = output.nextPhase;
      } else {
        currentIndex++;
        if (currentIndex < phaseOrder.length) {
          state.currentPhase = phaseOrder[currentIndex];
        }
      }

      console.log(`阶段结果: ${output.status}`);
      console.log(`消息: ${output.message}`);
      console.log(`下一阶段: ${state.currentPhase}`);
    }

    if (state.currentPhase === "delivery") {
      state.status = "completed";
    }

    return state;
  }

  private getAgentForPhase(phase: WorkflowPhase): string {
    const mapping: Record<WorkflowPhase, string> = {
      requirement_analysis: "product_manager",
      architecture_design: "architect",
      development: "developer",
      code_review: "reviewer",
      testing: "tester",
      delivery: "coordinator",
    };
    return mapping[phase];
  }

  async *streamRun(
    projectName: string,
    requirement: string
  ): AsyncGenerator<WorkflowStreamEvent> {
    const state: WorkflowState = {
      projectName,
      requirement,
      currentPhase: "requirement_analysis",
      artifacts: {},
      history: [],
      status: "in_progress",
    };

    yield {
      type: "workflow_started",
      data: { projectName, requirement },
    };

    const phaseOrder: WorkflowPhase[] = [
      "requirement_analysis",
      "architecture_design",
      "development",
      "code_review",
      "testing",
      "delivery",
    ];

    while (state.currentPhase !== "delivery") {
      yield {
        type: "phase_started",
        data: {
          phase: state.currentPhase,
          agent: this.getAgentForPhase(state.currentPhase),
        },
      };

      const executor = this.executors.get(state.currentPhase);
      if (!executor) break;

      const output = await executor.execute({ workflowState: state });

      yield {
        type: "phase_completed",
        data: {
          phase: state.currentPhase,
          status: output.status,
          message: output.message,
          artifacts: output.artifacts,
        },
      };

      state.artifacts = { ...state.artifacts, ...output.artifacts };

      if (output.status === "failed") {
        yield { type: "workflow_failed", data: { reason: output.message } };
        return;
      }

      if (output.nextPhase) {
        state.currentPhase = output.nextPhase;
      }
    }

    yield {
      type: "workflow_completed",
      data: { state },
    };
  }
}

interface WorkflowStreamEvent {
  type:
    | "workflow_started"
    | "phase_started"
    | "phase_completed"
    | "workflow_completed"
    | "workflow_failed";
  data: any;
}

流式事件驱动执行模型

完整使用示例

启动工作流

typescript
async function main() {
  const engine = new WorkflowEngine();

  const projectName = "用户管理系统";
  const requirement = `
    构建一个用户管理系统,需要支持:
    1. 用户注册和登录(邮箱/手机号)
    2. 用户信息管理(查看、编辑个人资料)
    3. 密码重置功能
    4. 角色权限管理(管理员、普通用户)
    5. 操作日志记录
    
    技术要求:
    - 后端使用 Node.js + TypeScript
    - 数据库使用 PostgreSQL
    - 提供 RESTful API
    - 需要完善的错误处理和输入验证
  `;

  console.log("🚀 启动软件开发工作流\n");
  console.log(`项目: ${projectName}`);
  console.log(`需求: ${requirement}\n`);

  const finalState = await engine.run(projectName, requirement);

  console.log("\n" + "=".repeat(60));
  console.log("📊 工作流执行报告");
  console.log("=".repeat(60));
  console.log(`状态: ${finalState.status}`);
  console.log(`阶段历史:`);

  for (const phase of finalState.history) {
    console.log(`  - ${phase.phase}: ${phase.status} (${phase.agent})`);
    console.log(`    ${phase.output}`);
  }
}

main().catch(console.error);

流式执行示例

typescript
async function streamExample() {
  const engine = new WorkflowEngine();

  const projectName = "博客系统";
  const requirement = `
    构建一个简单的博客系统:
    1. 文章的创建、编辑、删除
    2. 文章分类和标签
    3. 评论功能
    4. 用户认证
  `;

  console.log("🚀 启动流式工作流\n");

  for await (const event of engine.streamRun(projectName, requirement)) {
    switch (event.type) {
      case "workflow_started":
        console.log(`📁 项目启动: ${event.data.projectName}`);
        break;

      case "phase_started":
        console.log(`\n⏳ 阶段开始: ${event.data.phase}`);
        console.log(`   负责人: ${event.data.agent}`);
        break;

      case "phase_completed":
        console.log(`✅ 阶段完成: ${event.data.phase}`);
        console.log(`   状态: ${event.data.status}`);
        console.log(`   ${event.data.message}`);
        break;

      case "workflow_completed":
        console.log("\n🎉 工作流完成!");
        break;

      case "workflow_failed":
        console.log(`\n❌ 工作流失败: ${event.data.reason}`);
        break;
    }
  }
}

前端集成

React 组件

React Dashboard组件架构与数据流

typescript
import React, { useState } from "react";
import { useStream } from "@langchain/langgraph-sdk/react";

interface TeamMemberStatus {
  name: string;
  role: string;
  status: "idle" | "working" | "completed" | "blocked";
  currentTask?: string;
  output?: string;
}

function WorkflowDashboard() {
  const [requirement, setRequirement] = useState("");
  const [projectName, setProjectName] = useState("");

  const { messages, subagentStreams, isLoading, submit } = useStream<{
    messages: any[];
    workflow_state?: WorkflowState;
  }>({
    assistantId: "team-coordinator",
    filterSubagentMessages: true,
  });

  const teamMembers: TeamMemberStatus[] = [
    { name: "product_manager", role: "产品经理", status: "idle" },
    { name: "architect", role: "架构师", status: "idle" },
    { name: "developer", role: "开发者", status: "idle" },
    { name: "reviewer", role: "审查者", status: "idle" },
    { name: "tester", role: "测试员", status: "idle" },
  ].map((member) => {
    const stream = subagentStreams.find((s) => s.name === member.name);
    if (stream) {
      return {
        ...member,
        status: stream.status === "running" ? "working" : "completed",
        currentTask: stream.currentToolCall?.name,
        output: stream.messages[stream.messages.length - 1]?.content,
      };
    }
    return member;
  });

  const handleSubmit = () => {
    submit({
      messages: [
        {
          role: "user",
          content: `项目名称: ${projectName}\n\n需求描述:\n${requirement}`,
        },
      ],
    });
  };

  return (
    <div className="workflow-dashboard">
      <header className="dashboard-header">
        <h1>🏢 AI 软件开发团队</h1>
        <p>多代理协作工作流演示</p>
      </header>

      <div className="input-section">
        <div className="form-group">
          <label>项目名称</label>
          <input
            type="text"
            value={projectName}
            onChange={(e) => setProjectName(e.target.value)}
            placeholder="输入项目名称..."
          />
        </div>
        <div className="form-group">
          <label>需求描述</label>
          <textarea
            value={requirement}
            onChange={(e) => setRequirement(e.target.value)}
            placeholder="描述你的需求..."
            rows={6}
          />
        </div>
        <button
          onClick={handleSubmit}
          disabled={isLoading || !requirement || !projectName}
        >
          {isLoading ? "处理中..." : "🚀 启动工作流"}
        </button>
      </div>

      <div className="team-section">
        <h2>👥 团队状态</h2>
        <div className="team-grid">
          {teamMembers.map((member) => (
            <TeamMemberCard key={member.name} member={member} />
          ))}
        </div>
      </div>

      <div className="workflow-section">
        <h2>📋 工作流进度</h2>
        <WorkflowTimeline
          phases={[
            "requirement_analysis",
            "architecture_design",
            "development",
            "code_review",
            "testing",
            "delivery",
          ]}
          currentPhase={
            messages[messages.length - 1]?.workflow_state?.currentPhase
          }
          history={messages[messages.length - 1]?.workflow_state?.history || []}
        />
      </div>

      <div className="artifacts-section">
        <h2>📦 交付物</h2>
        <ArtifactViewer
          artifacts={
            messages[messages.length - 1]?.workflow_state?.artifacts || {}
          }
        />
      </div>
    </div>
  );
}

function TeamMemberCard({ member }: { member: TeamMemberStatus }) {
  const statusIcons: Record<TeamMemberStatus["status"], string> = {
    idle: "⚪",
    working: "🔵",
    completed: "🟢",
    blocked: "🔴",
  };

  const roleIcons: Record<string, string> = {
    产品经理: "📋",
    架构师: "🏗️",
    开发者: "💻",
    审查者: "🔍",
    测试员: "🧪",
  };

  return (
    <div className={`team-member-card ${member.status}`}>
      <div className="member-header">
        <span className="role-icon">{roleIcons[member.role]}</span>
        <span className="status-icon">{statusIcons[member.status]}</span>
      </div>
      <h3>{member.role}</h3>
      <p className="member-name">{member.name}</p>
      {member.currentTask && (
        <p className="current-task">当前任务: {member.currentTask}</p>
      )}
      {member.output && (
        <div className="member-output">
          <pre>{member.output.substring(0, 200)}...</pre>
        </div>
      )}
    </div>
  );
}

function WorkflowTimeline({
  phases,
  currentPhase,
  history,
}: {
  phases: WorkflowPhase[];
  currentPhase?: WorkflowPhase;
  history: PhaseResult[];
}) {
  const phaseNames: Record<WorkflowPhase, string> = {
    requirement_analysis: "需求分析",
    architecture_design: "架构设计",
    development: "开发实现",
    code_review: "代码审查",
    testing: "测试验证",
    delivery: "交付",
  };

  const getPhaseStatus = (phase: WorkflowPhase) => {
    if (phase === currentPhase) return "current";
    const result = history.find((h) => h.phase === phase);
    if (result) {
      return result.status === "success" ? "completed" : "failed";
    }
    return "pending";
  };

  return (
    <div className="workflow-timeline">
      {phases.map((phase, index) => (
        <React.Fragment key={phase}>
          <div className={`timeline-node ${getPhaseStatus(phase)}`}>
            <div className="node-circle">
              {getPhaseStatus(phase) === "completed"
                ? "✓"
                : getPhaseStatus(phase) === "current"
                  ? "●"
                  : index + 1}
            </div>
            <div className="node-label">{phaseNames[phase]}</div>
          </div>
          {index < phases.length - 1 && (
            <div
              className={`timeline-connector ${
                getPhaseStatus(phases[index + 1]) !== "pending"
                  ? "active"
                  : ""
              }`}
            />
          )}
        </React.Fragment>
      ))}
    </div>
  );
}

function ArtifactViewer({ artifacts }: { artifacts: WorkflowArtifacts }) {
  const [activeTab, setActiveTab] = useState<string>("requirement");

  const tabs = [
    { id: "requirement", label: "需求文档", data: artifacts.requirementDoc },
    { id: "design", label: "设计文档", data: artifacts.designDoc },
    { id: "code", label: "代码文件", data: artifacts.codeFiles },
    { id: "review", label: "审查报告", data: artifacts.reviewReport },
    { id: "test", label: "测试报告", data: artifacts.testReport },
  ];

  return (
    <div className="artifact-viewer">
      <div className="artifact-tabs">
        {tabs.map((tab) => (
          <button
            key={tab.id}
            className={`tab ${activeTab === tab.id ? "active" : ""} ${
              tab.data ? "has-data" : ""
            }`}
            onClick={() => setActiveTab(tab.id)}
          >
            {tab.label}
            {tab.data && <span className="badge">✓</span>}
          </button>
        ))}
      </div>
      <div className="artifact-content">
        {tabs.find((t) => t.id === activeTab)?.data ? (
          <pre>
            {JSON.stringify(
              tabs.find((t) => t.id === activeTab)?.data,
              null,
              2
            )}
          </pre>
        ) : (
          <p className="empty-state">暂无数据</p>
        )}
      </div>
    </div>
  );
}

export default WorkflowDashboard;

样式定义

css
/* workflow-dashboard.css */
.workflow-dashboard {
  max-width: 1400px;
  margin: 0 auto;
  padding: 2rem;
  font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
}

.dashboard-header {
  text-align: center;
  margin-bottom: 2rem;
}

.dashboard-header h1 {
  font-size: 2.5rem;
  margin-bottom: 0.5rem;
}

.input-section {
  background: #f8f9fa;
  padding: 2rem;
  border-radius: 12px;
  margin-bottom: 2rem;
}

.form-group {
  margin-bottom: 1rem;
}

.form-group label {
  display: block;
  font-weight: 600;
  margin-bottom: 0.5rem;
}

.form-group input,
.form-group textarea {
  width: 100%;
  padding: 0.75rem;
  border: 1px solid #ddd;
  border-radius: 8px;
  font-size: 1rem;
}

.input-section button {
  background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
  color: white;
  border: none;
  padding: 1rem 2rem;
  border-radius: 8px;
  font-size: 1.1rem;
  cursor: pointer;
  width: 100%;
}

.input-section button:disabled {
  opacity: 0.6;
  cursor: not-allowed;
}

.team-section {
  margin-bottom: 2rem;
}

.team-grid {
  display: grid;
  grid-template-columns: repeat(5, 1fr);
  gap: 1rem;
}

.team-member-card {
  background: white;
  border-radius: 12px;
  padding: 1.5rem;
  box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
  transition: transform 0.2s, box-shadow 0.2s;
}

.team-member-card.working {
  border: 2px solid #667eea;
  animation: pulse 2s infinite;
}

.team-member-card.completed {
  border: 2px solid #10b981;
}

.team-member-card.blocked {
  border: 2px solid #ef4444;
}

@keyframes pulse {
  0%,
  100% {
    box-shadow: 0 0 0 0 rgba(102, 126, 234, 0.4);
  }
  50% {
    box-shadow: 0 0 0 10px rgba(102, 126, 234, 0);
  }
}

.member-header {
  display: flex;
  justify-content: space-between;
  font-size: 1.5rem;
  margin-bottom: 0.5rem;
}

.workflow-timeline {
  display: flex;
  align-items: center;
  justify-content: space-between;
  padding: 2rem 0;
}

.timeline-node {
  display: flex;
  flex-direction: column;
  align-items: center;
  flex: 0 0 auto;
}

.node-circle {
  width: 40px;
  height: 40px;
  border-radius: 50%;
  background: #e5e7eb;
  display: flex;
  align-items: center;
  justify-content: center;
  font-weight: 600;
  margin-bottom: 0.5rem;
}

.timeline-node.current .node-circle {
  background: #667eea;
  color: white;
  animation: pulse 2s infinite;
}

.timeline-node.completed .node-circle {
  background: #10b981;
  color: white;
}

.timeline-node.failed .node-circle {
  background: #ef4444;
  color: white;
}

.timeline-connector {
  flex: 1;
  height: 3px;
  background: #e5e7eb;
  margin: 0 0.5rem;
}

.timeline-connector.active {
  background: #10b981;
}

.artifact-viewer {
  background: white;
  border-radius: 12px;
  box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
  overflow: hidden;
}

.artifact-tabs {
  display: flex;
  border-bottom: 1px solid #e5e7eb;
}

.artifact-tabs .tab {
  flex: 1;
  padding: 1rem;
  border: none;
  background: none;
  cursor: pointer;
  font-size: 0.9rem;
  transition: background 0.2s;
}

.artifact-tabs .tab:hover {
  background: #f8f9fa;
}

.artifact-tabs .tab.active {
  border-bottom: 2px solid #667eea;
  color: #667eea;
}

.artifact-tabs .tab .badge {
  margin-left: 0.5rem;
  color: #10b981;
}

.artifact-content {
  padding: 1.5rem;
  max-height: 400px;
  overflow-y: auto;
}

.artifact-content pre {
  background: #f8f9fa;
  padding: 1rem;
  border-radius: 8px;
  font-size: 0.85rem;
  white-space: pre-wrap;
}

.empty-state {
  text-align: center;
  color: #9ca3af;
  padding: 2rem;
}

高级特性

串行vs并行执行与工作流持久化

并行任务执行

typescript
class ParallelWorkflowEngine extends WorkflowEngine {
  async runParallelPhase(
    state: WorkflowState,
    agents: string[],
    task: string
  ): Promise<PhaseOutput[]> {
    const promises = agents.map((agentName) =>
      this.coordinator.invoke({
        messages: [
          {
            role: "user",
            content: `请让 ${agentName} 执行任务: ${task}`,
          },
        ],
      })
    );

    const results = await Promise.all(promises);
    return results.map((result) => ({
      status: "success" as const,
      artifacts: {},
      message: "并行任务完成",
    }));
  }

  async runWithParallelReview(
    projectName: string,
    requirement: string
  ): Promise<WorkflowState> {
    const state = await this.run(projectName, requirement);

    if (state.currentPhase === "code_review") {
      const [securityReview, performanceReview, codeQualityReview] =
        await this.runParallelPhase(
          state,
          ["security_reviewer", "performance_reviewer", "quality_reviewer"],
          "审查代码"
        );

      state.artifacts.reviewReport = {
        status: "approved",
        summary: "多维度审查完成",
        issues: [],
        suggestions: [],
        securityConcerns: [],
      };
    }

    return state;
  }
}

工作流持久化

typescript
import { PostgresSaver } from "@langchain/langgraph/checkpoint/postgres";

class PersistentWorkflowEngine extends WorkflowEngine {
  private checkpointer: PostgresSaver;

  constructor() {
    super();
    this.checkpointer = PostgresSaver.fromConnString(
      process.env.DATABASE_URL!
    );
  }

  async saveWorkflowState(
    threadId: string,
    state: WorkflowState
  ): Promise<void> {
    await this.checkpointer.put(
      { configurable: { thread_id: threadId } },
      {
        values: state,
        next: [],
        tasks: [],
        metadata: {},
      },
      { source: "update", step: 0, writes: {} },
      {}
    );
  }

  async loadWorkflowState(threadId: string): Promise<WorkflowState | null> {
    const checkpoint = await this.checkpointer.get({
      configurable: { thread_id: threadId },
    });
    return checkpoint?.values as WorkflowState | null;
  }

  async resumeWorkflow(threadId: string): Promise<WorkflowState> {
    const state = await this.loadWorkflowState(threadId);
    if (!state) {
      throw new Error(`No workflow found for thread: ${threadId}`);
    }

    console.log(`恢复工作流,当前阶段: ${state.currentPhase}`);
    return this.continueRun(state);
  }

  private async continueRun(state: WorkflowState): Promise<WorkflowState> {
    while (state.currentPhase !== "delivery" && state.status === "in_progress") {
      const executor = this.executors.get(state.currentPhase);
      if (!executor) break;

      const output = await executor.execute({ workflowState: state });
      state.artifacts = { ...state.artifacts, ...output.artifacts };

      if (output.nextPhase) {
        state.currentPhase = output.nextPhase;
      }

      await this.saveWorkflowState(state.projectName, state);
    }

    return state;
  }
}

系列总结

恭喜你完成了 DeepAgents 系列教程的全部 22 篇文章!让我们回顾一下学习历程:

知识图谱

DeepAgents 知识体系
├── 基础入门 (文章 1-3)
│   ├── 核心概念和 API
│   ├── 快速开始
│   └── 自定义配置
├── 后端系统 (文章 4-6)
│   ├── 虚拟文件系统
│   ├── 五种后端类型
│   └── CompositeBackend
├── 高级功能 (文章 7-10)
│   ├── 子代理系统
│   ├── 人机协作
│   ├── 长期记忆
│   └── 技能系统
├── 安全执行 (文章 11-12)
│   ├── 沙箱概览
│   └── 沙箱实战
├── 流式处理 (文章 13-14)
│   ├── 流式输出
│   └── 前端集成
├── CLI 工具 (文章 15-16)
│   ├── CLI 入门
│   └── 高级用法
├── 进阶配置 (文章 17-18)
│   ├── 中间件系统
│   └── 自定义模型
└── 项目实战 (文章 19-22)
    ├── 智能代码助手
    ├── 研究助理系统
    ├── 安全执行平台
    └── 多代理协作 ← 你在这里

DeepAgents 22篇系列教程知识地图

核心能力总结

能力关键技术应用场景
任务规划TodoListMiddleware复杂任务分解
上下文管理CompositeBackend多源数据整合
子代理协作SubAgent/CompiledSubAgent专业化分工
长期记忆StoreBackend知识积累
安全执行Sandbox代码运行
人机协作interrupt_on关键决策
流式输出streamMode/useStream实时交互

下一步建议

  1. 动手实践: 选择一个实战项目,从零开始构建
  2. 深入源码: 阅读 DeepAgents 源码,理解内部实现
  3. 社区参与: 加入社区,分享经验,贡献代码
  4. 持续学习: 关注 DeepAgents 更新,学习新特性

感谢你的学习,期待看到你构建的 AI 代理应用!🚀

读文档、看源码、写代码,理解 AI Agent 本质 🤖