主题
实战项目四:多代理协作工作流
在前面三个实战项目中,我们分别探索了代码助手、研究助理和安全执行平台。现在,让我们挑战系列教程的终极项目——构建一个完整的多代理协作系统,模拟真实的软件开发团队。
项目愿景
想象一个AI驱动的软件开发团队:产品经理分析需求、架构师设计方案、开发者实现代码、审查者把关质量、测试员验证功能。每个角色各司其职,又紧密协作。这就是我们要构建的系统。
需求输入 → PM分析 → 架构设计 → 开发实现 → 代码审查 → 测试验证 → 交付
架构设计
团队角色定义
typescript
import {
createDeepAgent,
SubAgent,
CompiledSubAgent,
StateBackend,
StoreBackend,
FilesystemBackend,
LocalShellBackend,
CompositeBackend,
} from "@langchain/langgraph-agents";
import { ChatOpenAI } from "@langchain/openai";
import { tool } from "@langchain/core/tools";
import { z } from "zod";
import { MemorySaver } from "@langchain/langgraph";
interface TeamMember {
name: string;
role: string;
responsibilities: string[];
}
const teamStructure: TeamMember[] = [
{
name: "product_manager",
role: "产品经理",
responsibilities: ["需求分析", "任务拆解", "优先级排序", "验收标准制定"],
},
{
name: "architect",
role: "架构师",
responsibilities: ["技术选型", "系统设计", "接口定义", "风险评估"],
},
{
name: "developer",
role: "开发者",
responsibilities: ["代码实现", "单元测试", "文档编写", "Bug修复"],
},
{
name: "reviewer",
role: "代码审查者",
responsibilities: ["代码审查", "最佳实践检查", "安全漏洞检测", "性能分析"],
},
{
name: "tester",
role: "测试工程师",
responsibilities: ["测试用例设计", "集成测试", "回归测试", "测试报告"],
},
];
工作流状态定义
typescript
interface WorkflowState {
projectName: string;
requirement: string;
currentPhase: WorkflowPhase;
artifacts: WorkflowArtifacts;
history: PhaseResult[];
status: "in_progress" | "blocked" | "completed" | "failed";
}
type WorkflowPhase =
| "requirement_analysis"
| "architecture_design"
| "development"
| "code_review"
| "testing"
| "delivery";
interface WorkflowArtifacts {
requirementDoc?: RequirementDocument;
designDoc?: DesignDocument;
codeFiles?: CodeFile[];
reviewReport?: ReviewReport;
testReport?: TestReport;
}
interface RequirementDocument {
title: string;
description: string;
userStories: UserStory[];
acceptanceCriteria: string[];
priority: "high" | "medium" | "low";
estimatedEffort: string;
}
interface UserStory {
id: string;
asA: string;
iWant: string;
soThat: string;
acceptanceCriteria: string[];
}
interface DesignDocument {
overview: string;
techStack: string[];
architecture: string;
components: ComponentDesign[];
interfaces: InterfaceDefinition[];
dataModels: DataModel[];
risks: Risk[];
}
interface ComponentDesign {
name: string;
responsibility: string;
dependencies: string[];
interfaces: string[];
}
interface InterfaceDefinition {
name: string;
method: string;
endpoint?: string;
parameters: ParameterDef[];
response: string;
}
interface ParameterDef {
name: string;
type: string;
required: boolean;
description: string;
}
interface DataModel {
name: string;
fields: FieldDef[];
relationships: string[];
}
interface FieldDef {
name: string;
type: string;
constraints: string[];
}
interface Risk {
description: string;
impact: "high" | "medium" | "low";
mitigation: string;
}
interface CodeFile {
path: string;
content: string;
language: string;
tests?: string;
}
interface ReviewReport {
status: "approved" | "changes_requested" | "rejected";
summary: string;
issues: ReviewIssue[];
suggestions: string[];
securityConcerns: string[];
}
interface ReviewIssue {
severity: "critical" | "major" | "minor" | "suggestion";
file: string;
line?: number;
description: string;
suggestion: string;
}
interface TestReport {
status: "passed" | "failed" | "partial";
summary: string;
testCases: TestCase[];
coverage: number;
performanceMetrics?: PerformanceMetric[];
}
interface TestCase {
name: string;
status: "passed" | "failed" | "skipped";
duration: number;
error?: string;
}
interface PerformanceMetric {
name: string;
value: number;
unit: string;
threshold?: number;
}
interface PhaseResult {
phase: WorkflowPhase;
agent: string;
startTime: Date;
endTime: Date;
status: "success" | "failed" | "blocked";
output: string;
artifacts: Partial<WorkflowArtifacts>;
}团队成员实现

1. 产品经理代理
typescript
const requirementAnalysisTool = tool(
async ({ requirement, context }) => {
return JSON.stringify({
status: "analyzed",
message: "需求分析完成,已生成需求文档",
});
},
{
name: "analyze_requirement",
description: "分析用户需求,生成结构化的需求文档",
schema: z.object({
requirement: z.string().describe("原始需求描述"),
context: z.string().optional().describe("项目上下文信息"),
}),
}
);
const prioritizeTool = tool(
async ({ items, criteria }) => {
return JSON.stringify({
status: "prioritized",
order: items,
});
},
{
name: "prioritize_tasks",
description: "根据业务价值和技术依赖对任务进行优先级排序",
schema: z.object({
items: z.array(z.string()).describe("待排序的任务列表"),
criteria: z.string().describe("排序标准"),
}),
}
);
const productManagerAgent: SubAgent = {
name: "product_manager",
description: `产品经理代理,负责:
- 分析和理解用户需求
- 编写用户故事和验收标准
- 任务拆解和优先级排序
- 制定项目里程碑`,
systemPrompt: `你是一位经验丰富的产品经理。你的职责是:
1. **需求分析**:
- 深入理解用户需求的本质
- 识别关键功能和非功能需求
- 发现潜在的需求冲突或遗漏
2. **用户故事编写**:
- 使用标准格式:作为[角色],我想要[功能],以便[价值]
- 每个故事都要有明确的验收标准
- 保持故事的独立性和可测试性
3. **任务拆解**:
- 将大需求拆解为可执行的小任务
- 识别任务间的依赖关系
- 估算工作量
4. **输出格式**:
始终以结构化的JSON格式输出需求文档,包含:
- title: 项目标题
- description: 项目描述
- userStories: 用户故事数组
- acceptanceCriteria: 整体验收标准
- priority: 优先级
- estimatedEffort: 预估工作量`,
tools: [requirementAnalysisTool, prioritizeTool],
};2. 架构师代理
typescript
const designSystemTool = tool(
async ({ requirements, constraints }) => {
return JSON.stringify({
status: "designed",
message: "系统架构设计完成",
});
},
{
name: "design_system",
description: "根据需求设计系统架构",
schema: z.object({
requirements: z.string().describe("需求文档"),
constraints: z.string().optional().describe("技术约束"),
}),
}
);
const defineInterfaceTool = tool(
async ({ componentName, interfaces }) => {
return JSON.stringify({
status: "defined",
interfaces: interfaces,
});
},
{
name: "define_interface",
description: "定义组件接口",
schema: z.object({
componentName: z.string().describe("组件名称"),
interfaces: z.array(z.string()).describe("接口定义"),
}),
}
);
const assessRiskTool = tool(
async ({ designDoc }) => {
return JSON.stringify({
status: "assessed",
risks: [],
});
},
{
name: "assess_risks",
description: "评估设计方案的技术风险",
schema: z.object({
designDoc: z.string().describe("设计文档"),
}),
}
);
const architectAgent: SubAgent = {
name: "architect",
description: `架构师代理,负责:
- 技术选型和架构设计
- 组件划分和接口定义
- 数据模型设计
- 技术风险评估`,
systemPrompt: `你是一位资深软件架构师。你的职责是:
1. **技术选型**:
- 根据项目需求选择合适的技术栈
- 考虑团队技术能力和学习曲线
- 评估技术的成熟度和社区支持
2. **架构设计**:
- 设计清晰的系统架构
- 定义组件边界和职责
- 确保高内聚低耦合
3. **接口定义**:
- 定义清晰的API接口
- 使用RESTful或GraphQL规范
- 考虑版本控制和向后兼容
4. **数据模型**:
- 设计合理的数据结构
- 定义实体关系
- 考虑数据一致性和完整性
5. **输出格式**:
始终以结构化的JSON格式输出设计文档,包含:
- overview: 设计概述
- techStack: 技术栈
- architecture: 架构描述
- components: 组件设计
- interfaces: 接口定义
- dataModels: 数据模型
- risks: 风险评估`,
tools: [designSystemTool, defineInterfaceTool, assessRiskTool],
};3. 开发者代理
typescript
const writeCodeTool = tool(
async ({ specification, language }) => {
return JSON.stringify({
status: "written",
message: "代码编写完成",
});
},
{
name: "write_code",
description: "根据规范编写代码",
schema: z.object({
specification: z.string().describe("代码规范"),
language: z.string().describe("编程语言"),
}),
}
);
const writeTestTool = tool(
async ({ codeFile, testFramework }) => {
return JSON.stringify({
status: "written",
message: "测试代码编写完成",
});
},
{
name: "write_tests",
description: "为代码编写单元测试",
schema: z.object({
codeFile: z.string().describe("源代码文件"),
testFramework: z.string().describe("测试框架"),
}),
}
);
const developerAgent: SubAgent = {
name: "developer",
description: `开发者代理,负责:
- 根据设计文档实现代码
- 编写单元测试
- 处理代码审查反馈
- 修复Bug`,
systemPrompt: `你是一位全栈开发工程师。你的职责是:
1. **代码实现**:
- 严格按照设计文档实现功能
- 遵循编码规范和最佳实践
- 编写清晰的代码注释
- 处理边界情况和错误
2. **单元测试**:
- 为每个功能编写单元测试
- 覆盖正常路径和边界情况
- 使用Mock隔离依赖
3. **代码质量**:
- 保持代码简洁易读
- 避免重复代码
- 使用有意义的命名
4. **输出格式**:
- 每个文件都要包含完整的代码
- 使用标准的项目结构
- 包含必要的配置文件
示例输出:
\`\`\`typescript
// src/services/user.service.ts
export class UserService {
async createUser(data: CreateUserDto): Promise<User> {
// 实现代码
}
}
\`\`\``,
tools: [writeCodeTool, writeTestTool],
backend: (rt) =>
new CompositeBackend(new StateBackend(rt), {
"/workspace/": new FilesystemBackend(rt, {
workingDirectory: "/tmp/dev-workspace",
}),
}),
};4. 代码审查者代理
typescript
const reviewCodeTool = tool(
async ({ code, standards }) => {
return JSON.stringify({
status: "reviewed",
issues: [],
});
},
{
name: "review_code",
description: "审查代码质量",
schema: z.object({
code: z.string().describe("待审查的代码"),
standards: z.string().optional().describe("审查标准"),
}),
}
);
const checkSecurityTool = tool(
async ({ code }) => {
return JSON.stringify({
status: "checked",
vulnerabilities: [],
});
},
{
name: "check_security",
description: "检查代码安全漏洞",
schema: z.object({
code: z.string().describe("待检查的代码"),
}),
}
);
const analyzePerformanceTool = tool(
async ({ code }) => {
return JSON.stringify({
status: "analyzed",
suggestions: [],
});
},
{
name: "analyze_performance",
description: "分析代码性能问题",
schema: z.object({
code: z.string().describe("待分析的代码"),
}),
}
);
const reviewerAgent: SubAgent = {
name: "reviewer",
description: `代码审查者代理,负责:
- 代码质量审查
- 安全漏洞检测
- 性能问题分析
- 最佳实践建议`,
systemPrompt: `你是一位严谨的代码审查专家。你的职责是:
1. **代码质量审查**:
- 检查代码是否符合设计规范
- 验证逻辑正确性
- 评估代码可读性和可维护性
- 检查错误处理是否完善
2. **安全审查**:
- 检查常见安全漏洞(SQL注入、XSS等)
- 验证输入验证和输出编码
- 检查敏感数据处理
- 评估认证和授权逻辑
3. **性能审查**:
- 识别潜在的性能瓶颈
- 检查N+1查询问题
- 评估内存使用
- 检查异步操作处理
4. **输出格式**:
始终以结构化的JSON格式输出审查报告,包含:
- status: "approved" | "changes_requested" | "rejected"
- summary: 审查总结
- issues: 问题列表(包含severity, file, line, description, suggestion)
- suggestions: 改进建议
- securityConcerns: 安全问题`,
tools: [reviewCodeTool, checkSecurityTool, analyzePerformanceTool],
};5. 测试工程师代理
typescript
const runTestsTool = tool(
async ({ testFiles, config }) => {
return JSON.stringify({
status: "completed",
results: [],
});
},
{
name: "run_tests",
description: "执行测试用例",
schema: z.object({
testFiles: z.array(z.string()).describe("测试文件列表"),
config: z.string().optional().describe("测试配置"),
}),
}
);
const generateTestCasesTool = tool(
async ({ requirements, code }) => {
return JSON.stringify({
status: "generated",
testCases: [],
});
},
{
name: "generate_test_cases",
description: "根据需求生成测试用例",
schema: z.object({
requirements: z.string().describe("需求文档"),
code: z.string().describe("实现代码"),
}),
}
);
const measureCoverageTool = tool(
async ({ testResults }) => {
return JSON.stringify({
coverage: 0,
uncoveredLines: [],
});
},
{
name: "measure_coverage",
description: "测量测试覆盖率",
schema: z.object({
testResults: z.string().describe("测试结果"),
}),
}
);
const testerAgent: SubAgent = {
name: "tester",
description: `测试工程师代理,负责:
- 设计测试用例
- 执行集成测试
- 回归测试
- 生成测试报告`,
systemPrompt: `你是一位专业的测试工程师。你的职责是:
1. **测试用例设计**:
- 根据需求设计测试用例
- 覆盖正常流程和异常流程
- 设计边界条件测试
- 包含性能测试场景
2. **测试执行**:
- 执行单元测试和集成测试
- 验证功能是否符合需求
- 检查回归问题
- 进行探索性测试
3. **测试报告**:
- 记录测试结果
- 分析失败原因
- 计算测试覆盖率
- 提供质量评估
4. **输出格式**:
始终以结构化的JSON格式输出测试报告,包含:
- status: "passed" | "failed" | "partial"
- summary: 测试总结
- testCases: 测试用例结果
- coverage: 覆盖率
- performanceMetrics: 性能指标`,
tools: [runTestsTool, generateTestCasesTool, measureCoverageTool],
backend: (rt) =>
new CompositeBackend(new StateBackend(rt), {
"/test-results/": new StoreBackend(rt, {
namespace: ["test", "results"],
}),
}),
};工作流协调器
主代理定义
typescript
const workflowCoordinatorTool = tool(
async ({ phase, input }) => {
return JSON.stringify({
status: "coordinated",
nextPhase: phase,
});
},
{
name: "coordinate_workflow",
description: "协调工作流阶段转换",
schema: z.object({
phase: z.enum([
"requirement_analysis",
"architecture_design",
"development",
"code_review",
"testing",
"delivery",
]),
input: z.string().describe("阶段输入"),
}),
}
);
const escalateIssueTool = tool(
async ({ issue, severity }) => {
return JSON.stringify({
status: "escalated",
action: "human_review",
});
},
{
name: "escalate_issue",
description: "升级问题到人工处理",
schema: z.object({
issue: z.string().describe("问题描述"),
severity: z.enum(["critical", "high", "medium", "low"]),
}),
}
);
const generateReportTool = tool(
async ({ workflowState }) => {
return JSON.stringify({
status: "generated",
report: {},
});
},
{
name: "generate_report",
description: "生成工作流报告",
schema: z.object({
workflowState: z.string().describe("工作流状态"),
}),
}
);
const createTeamCoordinator = () => {
const checkpointer = new MemorySaver();
return createDeepAgent({
model: new ChatOpenAI({
modelName: "gpt-4",
temperature: 0,
}),
systemPrompt: `你是软件开发团队的项目协调员。你负责:
1. **工作流管理**:
- 协调各个开发阶段的执行顺序
- 确保每个阶段的输出满足下一阶段的输入要求
- 处理阶段间的依赖关系
2. **团队协调**:
- 将任务分配给合适的团队成员
- 收集和整合各成员的输出
- 解决团队间的冲突
3. **质量把控**:
- 确保每个阶段的交付物符合标准
- 在关键节点进行审批
- 处理异常情况
4. **工作流程**:
需求输入 → 产品经理分析 → 架构师设计 → 开发者实现 → 审查者审查 → 测试员测试 → 交付
5. **决策规则**:
- 如果代码审查不通过,返回开发阶段
- 如果测试不通过,根据问题性质决定返回开发或审查阶段
- 关键决策需要人工确认
始终输出清晰的状态更新和下一步行动计划。`,
tools: [workflowCoordinatorTool, escalateIssueTool, generateReportTool],
subagents: [
productManagerAgent,
architectAgent,
developerAgent,
reviewerAgent,
testerAgent,
],
backend: (rt) =>
new CompositeBackend(new StateBackend(rt), {
"/project/": new FilesystemBackend(rt, {
workingDirectory: "/tmp/project-workspace",
}),
"/memories/": new StoreBackend(rt, {
namespace: ["workflow", "history"],
}),
}),
interruptOn: [
{
toolName: "task",
condition: (toolCall) => {
const args = toolCall.args as { name: string };
return args.name === "developer";
},
},
{
toolName: "escalate_issue",
},
],
checkpointer,
});
};
工作流执行引擎
阶段执行器

typescript
interface PhaseExecutor {
execute(input: PhaseInput): Promise<PhaseOutput>;
}
interface PhaseInput {
workflowState: WorkflowState;
previousOutput?: PhaseResult;
}
interface PhaseOutput {
status: "success" | "failed" | "blocked";
artifacts: Partial<WorkflowArtifacts>;
message: string;
nextPhase?: WorkflowPhase;
}
class RequirementPhaseExecutor implements PhaseExecutor {
constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}
async execute(input: PhaseInput): Promise<PhaseOutput> {
const result = await this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让产品经理分析以下需求,并生成需求文档:
项目名称:${input.workflowState.projectName}
原始需求:${input.workflowState.requirement}
要求:
1. 生成结构化的需求文档
2. 包含用户故事和验收标准
3. 进行任务拆解和优先级排序`,
},
],
});
const requirementDoc = this.parseRequirementDoc(result);
return {
status: "success",
artifacts: { requirementDoc },
message: "需求分析完成",
nextPhase: "architecture_design",
};
}
private parseRequirementDoc(result: any): RequirementDocument {
return {
title: "",
description: "",
userStories: [],
acceptanceCriteria: [],
priority: "medium",
estimatedEffort: "",
};
}
}
class DesignPhaseExecutor implements PhaseExecutor {
constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}
async execute(input: PhaseInput): Promise<PhaseOutput> {
const requirementDoc = input.workflowState.artifacts.requirementDoc;
const result = await this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让架构师根据以下需求设计系统架构:
需求文档:
${JSON.stringify(requirementDoc, null, 2)}
要求:
1. 选择合适的技术栈
2. 设计系统架构和组件
3. 定义接口和数据模型
4. 评估技术风险`,
},
],
});
const designDoc = this.parseDesignDoc(result);
return {
status: "success",
artifacts: { designDoc },
message: "架构设计完成",
nextPhase: "development",
};
}
private parseDesignDoc(result: any): DesignDocument {
return {
overview: "",
techStack: [],
architecture: "",
components: [],
interfaces: [],
dataModels: [],
risks: [],
};
}
}
class DevelopmentPhaseExecutor implements PhaseExecutor {
constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}
async execute(input: PhaseInput): Promise<PhaseOutput> {
const { requirementDoc, designDoc } = input.workflowState.artifacts;
const result = await this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让开发者根据设计文档实现代码:
需求文档:
${JSON.stringify(requirementDoc, null, 2)}
设计文档:
${JSON.stringify(designDoc, null, 2)}
要求:
1. 严格按照设计文档实现
2. 编写单元测试
3. 遵循编码规范`,
},
],
});
const codeFiles = this.parseCodeFiles(result);
return {
status: "success",
artifacts: { codeFiles },
message: "代码开发完成",
nextPhase: "code_review",
};
}
private parseCodeFiles(result: any): CodeFile[] {
return [];
}
}
class ReviewPhaseExecutor implements PhaseExecutor {
constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}
async execute(input: PhaseInput): Promise<PhaseOutput> {
const { codeFiles, designDoc } = input.workflowState.artifacts;
const result = await this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让代码审查者审查以下代码:
代码文件:
${JSON.stringify(codeFiles, null, 2)}
设计文档:
${JSON.stringify(designDoc, null, 2)}
审查要点:
1. 代码质量和规范
2. 安全漏洞检测
3. 性能问题分析
4. 是否符合设计规范`,
},
],
});
const reviewReport = this.parseReviewReport(result);
if (reviewReport.status === "rejected") {
return {
status: "blocked",
artifacts: { reviewReport },
message: "代码审查未通过,需要修改",
nextPhase: "development",
};
}
return {
status: "success",
artifacts: { reviewReport },
message:
reviewReport.status === "approved" ? "代码审查通过" : "代码需要修改",
nextPhase:
reviewReport.status === "approved" ? "testing" : "development",
};
}
private parseReviewReport(result: any): ReviewReport {
return {
status: "approved",
summary: "",
issues: [],
suggestions: [],
securityConcerns: [],
};
}
}
class TestingPhaseExecutor implements PhaseExecutor {
constructor(private coordinator: ReturnType<typeof createTeamCoordinator>) {}
async execute(input: PhaseInput): Promise<PhaseOutput> {
const { codeFiles, requirementDoc } = input.workflowState.artifacts;
const result = await this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让测试工程师执行测试:
代码文件:
${JSON.stringify(codeFiles, null, 2)}
需求文档:
${JSON.stringify(requirementDoc, null, 2)}
测试要求:
1. 设计测试用例
2. 执行功能测试
3. 测量代码覆盖率
4. 生成测试报告`,
},
],
});
const testReport = this.parseTestReport(result);
if (testReport.status === "failed") {
return {
status: "blocked",
artifacts: { testReport },
message: "测试未通过",
nextPhase: "development",
};
}
return {
status: "success",
artifacts: { testReport },
message: "测试通过",
nextPhase: "delivery",
};
}
private parseTestReport(result: any): TestReport {
return {
status: "passed",
summary: "",
testCases: [],
coverage: 0,
};
}
}工作流引擎
typescript
class WorkflowEngine {
private executors: Map<WorkflowPhase, PhaseExecutor>;
private coordinator: ReturnType<typeof createTeamCoordinator>;
constructor() {
this.coordinator = createTeamCoordinator();
this.executors = new Map([
["requirement_analysis", new RequirementPhaseExecutor(this.coordinator)],
["architecture_design", new DesignPhaseExecutor(this.coordinator)],
["development", new DevelopmentPhaseExecutor(this.coordinator)],
["code_review", new ReviewPhaseExecutor(this.coordinator)],
["testing", new TestingPhaseExecutor(this.coordinator)],
]);
}
async run(
projectName: string,
requirement: string
): Promise<WorkflowState> {
const state: WorkflowState = {
projectName,
requirement,
currentPhase: "requirement_analysis",
artifacts: {},
history: [],
status: "in_progress",
};
const phaseOrder: WorkflowPhase[] = [
"requirement_analysis",
"architecture_design",
"development",
"code_review",
"testing",
"delivery",
];
let currentIndex = 0;
let maxIterations = 10;
let iterations = 0;
while (
state.currentPhase !== "delivery" &&
iterations < maxIterations
) {
iterations++;
const executor = this.executors.get(state.currentPhase);
if (!executor) {
throw new Error(`No executor for phase: ${state.currentPhase}`);
}
console.log(`\n${"=".repeat(60)}`);
console.log(`执行阶段: ${state.currentPhase}`);
console.log(`${"=".repeat(60)}\n`);
const startTime = new Date();
const output = await executor.execute({
workflowState: state,
previousOutput: state.history[state.history.length - 1],
});
const endTime = new Date();
state.artifacts = { ...state.artifacts, ...output.artifacts };
state.history.push({
phase: state.currentPhase,
agent: this.getAgentForPhase(state.currentPhase),
startTime,
endTime,
status: output.status,
output: output.message,
artifacts: output.artifacts,
});
if (output.status === "failed") {
state.status = "failed";
break;
}
if (output.nextPhase) {
state.currentPhase = output.nextPhase;
} else {
currentIndex++;
if (currentIndex < phaseOrder.length) {
state.currentPhase = phaseOrder[currentIndex];
}
}
console.log(`阶段结果: ${output.status}`);
console.log(`消息: ${output.message}`);
console.log(`下一阶段: ${state.currentPhase}`);
}
if (state.currentPhase === "delivery") {
state.status = "completed";
}
return state;
}
private getAgentForPhase(phase: WorkflowPhase): string {
const mapping: Record<WorkflowPhase, string> = {
requirement_analysis: "product_manager",
architecture_design: "architect",
development: "developer",
code_review: "reviewer",
testing: "tester",
delivery: "coordinator",
};
return mapping[phase];
}
async *streamRun(
projectName: string,
requirement: string
): AsyncGenerator<WorkflowStreamEvent> {
const state: WorkflowState = {
projectName,
requirement,
currentPhase: "requirement_analysis",
artifacts: {},
history: [],
status: "in_progress",
};
yield {
type: "workflow_started",
data: { projectName, requirement },
};
const phaseOrder: WorkflowPhase[] = [
"requirement_analysis",
"architecture_design",
"development",
"code_review",
"testing",
"delivery",
];
while (state.currentPhase !== "delivery") {
yield {
type: "phase_started",
data: {
phase: state.currentPhase,
agent: this.getAgentForPhase(state.currentPhase),
},
};
const executor = this.executors.get(state.currentPhase);
if (!executor) break;
const output = await executor.execute({ workflowState: state });
yield {
type: "phase_completed",
data: {
phase: state.currentPhase,
status: output.status,
message: output.message,
artifacts: output.artifacts,
},
};
state.artifacts = { ...state.artifacts, ...output.artifacts };
if (output.status === "failed") {
yield { type: "workflow_failed", data: { reason: output.message } };
return;
}
if (output.nextPhase) {
state.currentPhase = output.nextPhase;
}
}
yield {
type: "workflow_completed",
data: { state },
};
}
}
interface WorkflowStreamEvent {
type:
| "workflow_started"
| "phase_started"
| "phase_completed"
| "workflow_completed"
| "workflow_failed";
data: any;
}
完整使用示例
启动工作流
typescript
async function main() {
const engine = new WorkflowEngine();
const projectName = "用户管理系统";
const requirement = `
构建一个用户管理系统,需要支持:
1. 用户注册和登录(邮箱/手机号)
2. 用户信息管理(查看、编辑个人资料)
3. 密码重置功能
4. 角色权限管理(管理员、普通用户)
5. 操作日志记录
技术要求:
- 后端使用 Node.js + TypeScript
- 数据库使用 PostgreSQL
- 提供 RESTful API
- 需要完善的错误处理和输入验证
`;
console.log("🚀 启动软件开发工作流\n");
console.log(`项目: ${projectName}`);
console.log(`需求: ${requirement}\n`);
const finalState = await engine.run(projectName, requirement);
console.log("\n" + "=".repeat(60));
console.log("📊 工作流执行报告");
console.log("=".repeat(60));
console.log(`状态: ${finalState.status}`);
console.log(`阶段历史:`);
for (const phase of finalState.history) {
console.log(` - ${phase.phase}: ${phase.status} (${phase.agent})`);
console.log(` ${phase.output}`);
}
}
main().catch(console.error);流式执行示例
typescript
async function streamExample() {
const engine = new WorkflowEngine();
const projectName = "博客系统";
const requirement = `
构建一个简单的博客系统:
1. 文章的创建、编辑、删除
2. 文章分类和标签
3. 评论功能
4. 用户认证
`;
console.log("🚀 启动流式工作流\n");
for await (const event of engine.streamRun(projectName, requirement)) {
switch (event.type) {
case "workflow_started":
console.log(`📁 项目启动: ${event.data.projectName}`);
break;
case "phase_started":
console.log(`\n⏳ 阶段开始: ${event.data.phase}`);
console.log(` 负责人: ${event.data.agent}`);
break;
case "phase_completed":
console.log(`✅ 阶段完成: ${event.data.phase}`);
console.log(` 状态: ${event.data.status}`);
console.log(` ${event.data.message}`);
break;
case "workflow_completed":
console.log("\n🎉 工作流完成!");
break;
case "workflow_failed":
console.log(`\n❌ 工作流失败: ${event.data.reason}`);
break;
}
}
}前端集成
React 组件

typescript
import React, { useState } from "react";
import { useStream } from "@langchain/langgraph-sdk/react";
interface TeamMemberStatus {
name: string;
role: string;
status: "idle" | "working" | "completed" | "blocked";
currentTask?: string;
output?: string;
}
function WorkflowDashboard() {
const [requirement, setRequirement] = useState("");
const [projectName, setProjectName] = useState("");
const { messages, subagentStreams, isLoading, submit } = useStream<{
messages: any[];
workflow_state?: WorkflowState;
}>({
assistantId: "team-coordinator",
filterSubagentMessages: true,
});
const teamMembers: TeamMemberStatus[] = [
{ name: "product_manager", role: "产品经理", status: "idle" },
{ name: "architect", role: "架构师", status: "idle" },
{ name: "developer", role: "开发者", status: "idle" },
{ name: "reviewer", role: "审查者", status: "idle" },
{ name: "tester", role: "测试员", status: "idle" },
].map((member) => {
const stream = subagentStreams.find((s) => s.name === member.name);
if (stream) {
return {
...member,
status: stream.status === "running" ? "working" : "completed",
currentTask: stream.currentToolCall?.name,
output: stream.messages[stream.messages.length - 1]?.content,
};
}
return member;
});
const handleSubmit = () => {
submit({
messages: [
{
role: "user",
content: `项目名称: ${projectName}\n\n需求描述:\n${requirement}`,
},
],
});
};
return (
<div className="workflow-dashboard">
<header className="dashboard-header">
<h1>🏢 AI 软件开发团队</h1>
<p>多代理协作工作流演示</p>
</header>
<div className="input-section">
<div className="form-group">
<label>项目名称</label>
<input
type="text"
value={projectName}
onChange={(e) => setProjectName(e.target.value)}
placeholder="输入项目名称..."
/>
</div>
<div className="form-group">
<label>需求描述</label>
<textarea
value={requirement}
onChange={(e) => setRequirement(e.target.value)}
placeholder="描述你的需求..."
rows={6}
/>
</div>
<button
onClick={handleSubmit}
disabled={isLoading || !requirement || !projectName}
>
{isLoading ? "处理中..." : "🚀 启动工作流"}
</button>
</div>
<div className="team-section">
<h2>👥 团队状态</h2>
<div className="team-grid">
{teamMembers.map((member) => (
<TeamMemberCard key={member.name} member={member} />
))}
</div>
</div>
<div className="workflow-section">
<h2>📋 工作流进度</h2>
<WorkflowTimeline
phases={[
"requirement_analysis",
"architecture_design",
"development",
"code_review",
"testing",
"delivery",
]}
currentPhase={
messages[messages.length - 1]?.workflow_state?.currentPhase
}
history={messages[messages.length - 1]?.workflow_state?.history || []}
/>
</div>
<div className="artifacts-section">
<h2>📦 交付物</h2>
<ArtifactViewer
artifacts={
messages[messages.length - 1]?.workflow_state?.artifacts || {}
}
/>
</div>
</div>
);
}
function TeamMemberCard({ member }: { member: TeamMemberStatus }) {
const statusIcons: Record<TeamMemberStatus["status"], string> = {
idle: "⚪",
working: "🔵",
completed: "🟢",
blocked: "🔴",
};
const roleIcons: Record<string, string> = {
产品经理: "📋",
架构师: "🏗️",
开发者: "💻",
审查者: "🔍",
测试员: "🧪",
};
return (
<div className={`team-member-card ${member.status}`}>
<div className="member-header">
<span className="role-icon">{roleIcons[member.role]}</span>
<span className="status-icon">{statusIcons[member.status]}</span>
</div>
<h3>{member.role}</h3>
<p className="member-name">{member.name}</p>
{member.currentTask && (
<p className="current-task">当前任务: {member.currentTask}</p>
)}
{member.output && (
<div className="member-output">
<pre>{member.output.substring(0, 200)}...</pre>
</div>
)}
</div>
);
}
function WorkflowTimeline({
phases,
currentPhase,
history,
}: {
phases: WorkflowPhase[];
currentPhase?: WorkflowPhase;
history: PhaseResult[];
}) {
const phaseNames: Record<WorkflowPhase, string> = {
requirement_analysis: "需求分析",
architecture_design: "架构设计",
development: "开发实现",
code_review: "代码审查",
testing: "测试验证",
delivery: "交付",
};
const getPhaseStatus = (phase: WorkflowPhase) => {
if (phase === currentPhase) return "current";
const result = history.find((h) => h.phase === phase);
if (result) {
return result.status === "success" ? "completed" : "failed";
}
return "pending";
};
return (
<div className="workflow-timeline">
{phases.map((phase, index) => (
<React.Fragment key={phase}>
<div className={`timeline-node ${getPhaseStatus(phase)}`}>
<div className="node-circle">
{getPhaseStatus(phase) === "completed"
? "✓"
: getPhaseStatus(phase) === "current"
? "●"
: index + 1}
</div>
<div className="node-label">{phaseNames[phase]}</div>
</div>
{index < phases.length - 1 && (
<div
className={`timeline-connector ${
getPhaseStatus(phases[index + 1]) !== "pending"
? "active"
: ""
}`}
/>
)}
</React.Fragment>
))}
</div>
);
}
function ArtifactViewer({ artifacts }: { artifacts: WorkflowArtifacts }) {
const [activeTab, setActiveTab] = useState<string>("requirement");
const tabs = [
{ id: "requirement", label: "需求文档", data: artifacts.requirementDoc },
{ id: "design", label: "设计文档", data: artifacts.designDoc },
{ id: "code", label: "代码文件", data: artifacts.codeFiles },
{ id: "review", label: "审查报告", data: artifacts.reviewReport },
{ id: "test", label: "测试报告", data: artifacts.testReport },
];
return (
<div className="artifact-viewer">
<div className="artifact-tabs">
{tabs.map((tab) => (
<button
key={tab.id}
className={`tab ${activeTab === tab.id ? "active" : ""} ${
tab.data ? "has-data" : ""
}`}
onClick={() => setActiveTab(tab.id)}
>
{tab.label}
{tab.data && <span className="badge">✓</span>}
</button>
))}
</div>
<div className="artifact-content">
{tabs.find((t) => t.id === activeTab)?.data ? (
<pre>
{JSON.stringify(
tabs.find((t) => t.id === activeTab)?.data,
null,
2
)}
</pre>
) : (
<p className="empty-state">暂无数据</p>
)}
</div>
</div>
);
}
export default WorkflowDashboard;样式定义
css
/* workflow-dashboard.css */
.workflow-dashboard {
max-width: 1400px;
margin: 0 auto;
padding: 2rem;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
}
.dashboard-header {
text-align: center;
margin-bottom: 2rem;
}
.dashboard-header h1 {
font-size: 2.5rem;
margin-bottom: 0.5rem;
}
.input-section {
background: #f8f9fa;
padding: 2rem;
border-radius: 12px;
margin-bottom: 2rem;
}
.form-group {
margin-bottom: 1rem;
}
.form-group label {
display: block;
font-weight: 600;
margin-bottom: 0.5rem;
}
.form-group input,
.form-group textarea {
width: 100%;
padding: 0.75rem;
border: 1px solid #ddd;
border-radius: 8px;
font-size: 1rem;
}
.input-section button {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
border: none;
padding: 1rem 2rem;
border-radius: 8px;
font-size: 1.1rem;
cursor: pointer;
width: 100%;
}
.input-section button:disabled {
opacity: 0.6;
cursor: not-allowed;
}
.team-section {
margin-bottom: 2rem;
}
.team-grid {
display: grid;
grid-template-columns: repeat(5, 1fr);
gap: 1rem;
}
.team-member-card {
background: white;
border-radius: 12px;
padding: 1.5rem;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
transition: transform 0.2s, box-shadow 0.2s;
}
.team-member-card.working {
border: 2px solid #667eea;
animation: pulse 2s infinite;
}
.team-member-card.completed {
border: 2px solid #10b981;
}
.team-member-card.blocked {
border: 2px solid #ef4444;
}
@keyframes pulse {
0%,
100% {
box-shadow: 0 0 0 0 rgba(102, 126, 234, 0.4);
}
50% {
box-shadow: 0 0 0 10px rgba(102, 126, 234, 0);
}
}
.member-header {
display: flex;
justify-content: space-between;
font-size: 1.5rem;
margin-bottom: 0.5rem;
}
.workflow-timeline {
display: flex;
align-items: center;
justify-content: space-between;
padding: 2rem 0;
}
.timeline-node {
display: flex;
flex-direction: column;
align-items: center;
flex: 0 0 auto;
}
.node-circle {
width: 40px;
height: 40px;
border-radius: 50%;
background: #e5e7eb;
display: flex;
align-items: center;
justify-content: center;
font-weight: 600;
margin-bottom: 0.5rem;
}
.timeline-node.current .node-circle {
background: #667eea;
color: white;
animation: pulse 2s infinite;
}
.timeline-node.completed .node-circle {
background: #10b981;
color: white;
}
.timeline-node.failed .node-circle {
background: #ef4444;
color: white;
}
.timeline-connector {
flex: 1;
height: 3px;
background: #e5e7eb;
margin: 0 0.5rem;
}
.timeline-connector.active {
background: #10b981;
}
.artifact-viewer {
background: white;
border-radius: 12px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
.artifact-tabs {
display: flex;
border-bottom: 1px solid #e5e7eb;
}
.artifact-tabs .tab {
flex: 1;
padding: 1rem;
border: none;
background: none;
cursor: pointer;
font-size: 0.9rem;
transition: background 0.2s;
}
.artifact-tabs .tab:hover {
background: #f8f9fa;
}
.artifact-tabs .tab.active {
border-bottom: 2px solid #667eea;
color: #667eea;
}
.artifact-tabs .tab .badge {
margin-left: 0.5rem;
color: #10b981;
}
.artifact-content {
padding: 1.5rem;
max-height: 400px;
overflow-y: auto;
}
.artifact-content pre {
background: #f8f9fa;
padding: 1rem;
border-radius: 8px;
font-size: 0.85rem;
white-space: pre-wrap;
}
.empty-state {
text-align: center;
color: #9ca3af;
padding: 2rem;
}高级特性

并行任务执行
typescript
class ParallelWorkflowEngine extends WorkflowEngine {
async runParallelPhase(
state: WorkflowState,
agents: string[],
task: string
): Promise<PhaseOutput[]> {
const promises = agents.map((agentName) =>
this.coordinator.invoke({
messages: [
{
role: "user",
content: `请让 ${agentName} 执行任务: ${task}`,
},
],
})
);
const results = await Promise.all(promises);
return results.map((result) => ({
status: "success" as const,
artifacts: {},
message: "并行任务完成",
}));
}
async runWithParallelReview(
projectName: string,
requirement: string
): Promise<WorkflowState> {
const state = await this.run(projectName, requirement);
if (state.currentPhase === "code_review") {
const [securityReview, performanceReview, codeQualityReview] =
await this.runParallelPhase(
state,
["security_reviewer", "performance_reviewer", "quality_reviewer"],
"审查代码"
);
state.artifacts.reviewReport = {
status: "approved",
summary: "多维度审查完成",
issues: [],
suggestions: [],
securityConcerns: [],
};
}
return state;
}
}工作流持久化
typescript
import { PostgresSaver } from "@langchain/langgraph/checkpoint/postgres";
class PersistentWorkflowEngine extends WorkflowEngine {
private checkpointer: PostgresSaver;
constructor() {
super();
this.checkpointer = PostgresSaver.fromConnString(
process.env.DATABASE_URL!
);
}
async saveWorkflowState(
threadId: string,
state: WorkflowState
): Promise<void> {
await this.checkpointer.put(
{ configurable: { thread_id: threadId } },
{
values: state,
next: [],
tasks: [],
metadata: {},
},
{ source: "update", step: 0, writes: {} },
{}
);
}
async loadWorkflowState(threadId: string): Promise<WorkflowState | null> {
const checkpoint = await this.checkpointer.get({
configurable: { thread_id: threadId },
});
return checkpoint?.values as WorkflowState | null;
}
async resumeWorkflow(threadId: string): Promise<WorkflowState> {
const state = await this.loadWorkflowState(threadId);
if (!state) {
throw new Error(`No workflow found for thread: ${threadId}`);
}
console.log(`恢复工作流,当前阶段: ${state.currentPhase}`);
return this.continueRun(state);
}
private async continueRun(state: WorkflowState): Promise<WorkflowState> {
while (state.currentPhase !== "delivery" && state.status === "in_progress") {
const executor = this.executors.get(state.currentPhase);
if (!executor) break;
const output = await executor.execute({ workflowState: state });
state.artifacts = { ...state.artifacts, ...output.artifacts };
if (output.nextPhase) {
state.currentPhase = output.nextPhase;
}
await this.saveWorkflowState(state.projectName, state);
}
return state;
}
}系列总结
恭喜你完成了 DeepAgents 系列教程的全部 22 篇文章!让我们回顾一下学习历程:
知识图谱
DeepAgents 知识体系
├── 基础入门 (文章 1-3)
│ ├── 核心概念和 API
│ ├── 快速开始
│ └── 自定义配置
├── 后端系统 (文章 4-6)
│ ├── 虚拟文件系统
│ ├── 五种后端类型
│ └── CompositeBackend
├── 高级功能 (文章 7-10)
│ ├── 子代理系统
│ ├── 人机协作
│ ├── 长期记忆
│ └── 技能系统
├── 安全执行 (文章 11-12)
│ ├── 沙箱概览
│ └── 沙箱实战
├── 流式处理 (文章 13-14)
│ ├── 流式输出
│ └── 前端集成
├── CLI 工具 (文章 15-16)
│ ├── CLI 入门
│ └── 高级用法
├── 进阶配置 (文章 17-18)
│ ├── 中间件系统
│ └── 自定义模型
└── 项目实战 (文章 19-22)
├── 智能代码助手
├── 研究助理系统
├── 安全执行平台
└── 多代理协作 ← 你在这里
核心能力总结
| 能力 | 关键技术 | 应用场景 |
|---|---|---|
| 任务规划 | TodoListMiddleware | 复杂任务分解 |
| 上下文管理 | CompositeBackend | 多源数据整合 |
| 子代理协作 | SubAgent/CompiledSubAgent | 专业化分工 |
| 长期记忆 | StoreBackend | 知识积累 |
| 安全执行 | Sandbox | 代码运行 |
| 人机协作 | interrupt_on | 关键决策 |
| 流式输出 | streamMode/useStream | 实时交互 |
下一步建议
- 动手实践: 选择一个实战项目,从零开始构建
- 深入源码: 阅读 DeepAgents 源码,理解内部实现
- 社区参与: 加入社区,分享经验,贡献代码
- 持续学习: 关注 DeepAgents 更新,学习新特性
感谢你的学习,期待看到你构建的 AI 代理应用!🚀