第四章:工作流Agent - 让Agent按流程办事
本章你将学到
前面几章我们学了 LLMAgent,它很强大,但有一个特点:它让 LLM 来决定下一步做什么。这就好比你请了一个很聪明的助手,你告诉他目标,他自己决定怎么干。
但有时候,我们不想让 LLM 自由发挥。我们希望严格按照我们设定的流程来执行,一步一步地走。就像工厂的流水线,每个工位做什么、顺序是什么,都是你提前定好的。
这就是工作流 Agent(Workflow Agent)要干的事。
读完这一章,你会掌握:
- 工作流 Agent 和普通 LLMAgent 的核心区别
- 三种工作流 Agent:Sequential(顺序)、Parallel(并行)、Loop(循环)
- OutputKey 和 State 传递:Agent 之间如何传数据(重点中的重点)
- State 的前缀机制:
temp:、app:、user:分别是啥意思 - 如何组合嵌套多种工作流
- 如何把自定义 Agent 放进工作流里
工作流Agent vs 普通Agent
先来聊聊它们的本质区别,这个理解了,后面的内容就非常自然。
LLMAgent:LLM 做主
用户输入 → LLM 思考 → 决定调什么工具 → 看结果 → 再思考 → 输出回复
LLMAgent 的核心是:LLM 说了算。你给它工具和指令,它自己决定用哪个工具、用什么顺序、要不要循环。这很灵活,但也意味着你没法完全控制执行流程。
工作流Agent:你做主
你定义流程 → Agent 按流程一步步执行 → 你完全掌控顺序
工作流 Agent 的核心是:你说了算。你明确地告诉系统:先执行 A,再执行 B,最后执行 C。不管 LLM 怎么想,流程就是这个流程。
打个比方:
- LLMAgent 像是给了实习生一个任务,让他自由发挥
- 工作流Agent 像是给了流水线工人一份 SOP,每一步都写得清清楚楚
什么时候用哪个?看场景:
| 场景 | 选择 | 原因 |
|---|---|---|
| 聊天机器人 | LLMAgent | 需要灵活应对各种问题 |
| 代码审查流水线 | SequentialAgent | 写代码 → 审查 → 重构,顺序固定 |
| 多数据源搜索 | ParallelAgent | 几个搜索互不相干,并行跑更快 |
| 文章反复润色 | LoopAgent | 同一个任务重复做,越做越好 |
好,概念明白了,我们来看具体怎么用。
SequentialAgent:流水线
SequentialAgent 是最直观的工作流 Agent,就像工厂流水线一样:Agent 按照你排列的顺序,一个接一个地执行。
基本用法
先看一个最简单的例子:
package main
import (
"context"
"fmt"
"google.golang.org/genai"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
"google.golang.org/adk/runner"
"google.golang.org/adk/session"
)
func main() {
ctx := context.Background()
// 假设你已经有了一个 model(LLM 实例)
// model := ... 具体创建方式见前面章节
// 第一步:代码写手
codeWriterAgent, _ := llmagent.New(llmagent.Config{
Name: "CodeWriterAgent",
Model: model,
Instruction: `你是Python代码生成器。根据用户需求写代码。只输出代码块。`,
OutputKey: "generated_code", // 关键!输出存到 state 里
})
// 第二步:代码审查员
codeReviewerAgent, _ := llmagent.New(llmagent.Config{
Name: "CodeReviewerAgent",
Model: model,
Instruction: `你是代码审查专家。审查以下代码:
'''python
{generated_code}
'''
给出改进建议。`,
OutputKey: "temp:review_comments", // temp: 前缀表示临时数据
})
// 第三步:代码重构师
codeRefactorerAgent, _ := llmagent.New(llmagent.Config{
Name: "CodeRefactorerAgent",
Model: model,
Instruction: `根据审查意见重构代码:
原始代码:{generated_code}
审查意见:{temp:review_comments}
输出重构后的代码。`,
OutputKey: "refactored_code",
})
// 把三个 Agent 串成流水线
pipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "CodePipeline",
SubAgents: []agent.Agent{
codeWriterAgent, // 第一个执行
codeReviewerAgent, // 第二个执行
codeRefactorerAgent, // 第三个执行
},
},
})
// 创建 session 和 runner
sessionService := session.InMemoryService()
r, _ := runner.New(runner.Config{
AppName: "code_pipeline_app",
Agent: pipeline,
SessionService: sessionService,
})
// 创建会话
createResp, _ := sessionService.Create(ctx, &session.CreateRequest{
AppName: "code_pipeline_app",
UserID: "developer",
})
sessionID := createResp.Session.ID()
// 运行流水线
userMsg := genai.NewContentFromText("写一个冒泡排序算法", genai.RoleUser)
for event, err := range r.Run(ctx, "developer", sessionID, userMsg, agent.RunConfig{}) {
if err != nil {
fmt.Printf("出错了: %v\n", err)
continue
}
if event != nil && event.Content != nil {
for _, part := range event.Content.Parts {
if part.Text != "" {
fmt.Printf("[%s] %s\n", event.Author, part.Text)
}
}
}
}
}
执行顺序是这样的:
用户: "写一个冒泡排序算法"
↓
CodeWriterAgent: 写出冒泡排序代码 → 存到 state["generated_code"]
↓
CodeReviewerAgent: 读取 {generated_code},审查代码 → 存到 state["temp:review_comments"]
↓
CodeRefactorerAgent: 读取 {generated_code} 和 {temp:review_comments},输出重构后的代码
↓
完成!
看到没?Agent 之间是通过 state(状态)来传递数据的。这就引出了本章最重要的概念。
OutputKey 和 State 传递
这是工作流 Agent 里最核心的概念,搞懂了它,你才能真正串联起多个 Agent。
OutputKey 是什么?
OutputKey 是 llmagent.Config 的一个字段。当一个 Agent 产生输出时,如果设置了 OutputKey,ADK 会自动把这个输出存到 Session State 里。
agent, _ := llmagent.New(llmagent.Config{
Name: "WriterAgent",
Model: model,
Instruction: "写一首关于Go语言的诗",
OutputKey: "poem", // Agent 的输出会自动存到 state["poem"]
})
当这个 Agent 运行完毕,假设它输出了"Go语言如清风...",那么 Session State 里就会有:
state["poem"] = "Go语言如清风..."
在 Instruction 中引用 State
其他 Agent 可以在 Instruction 里用 {key_name} 的语法来引用 State 中的值:
reviewerAgent, _ := llmagent.New(llmagent.Config{
Name: "ReviewerAgent",
Model: model,
Instruction: `请评价以下诗歌的质量:
{poem}
从意境、韵律、创意三个维度打分。`,
})
ADK 在运行时会自动把 {poem} 替换成 State 里存的值。所以实际发给 LLM 的指令会变成:
请评价以下诗歌的质量:
Go语言如清风...
从意境、韵律、创意三个维度打分。
这就是 Agent 之间传递数据的秘密。
State 的前缀机制
State 的 key 可以带前缀,不同前缀决定了数据的作用域和生命周期:
| 前缀 | 格式 | 作用域 | 生命周期 | 典型用途 |
|---|---|---|---|---|
| 无前缀 | "my_key" |
当前会话 | 会话期间持久化 | 需要在多次调用中复用的数据 |
temp: |
"temp:my_key" |
当前调用 | 仅本次调用期间 | 中间结果,用完就扔 |
app: |
"app:my_key" |
整个应用 | 跨用户、跨会话共享 | 全局配置、共享知识 |
user: |
"user:my_key" |
当前用户 | 同一用户的所有会话共享 | 用户偏好、历史记录 |
来看几个例子,帮你理解这些前缀:
// 1. 无前缀 - 普通的会话级数据
// 这个数据会在当前会话中一直存在
writerAgent, _ := llmagent.New(llmagent.Config{
Name: "WriterAgent",
Model: model,
Instruction: "写一篇短文",
OutputKey: "article", // state["article"] = "..."
})
// 2. temp: 前缀 - 临时数据,用完就扔
// 比如审查意见,最终用户不需要看到,只是中间步骤
reviewerAgent, _ := llmagent.New(llmagent.Config{
Name: "ReviewerAgent",
Model: model,
Instruction: `审查这篇文章:{article}`,
OutputKey: "temp:review_notes", // 临时数据,调用结束后不保留
})
// 3. app: 前缀 - 应用级共享数据
// 所有用户都能读到,比如公告、全局设定
// 通常在初始化 session 时设置,或通过工具设置
// state["app:writing_style"] = "正式、专业"
// 4. user: 前缀 - 用户级共享数据
// 同一用户的不同会话都能读到
// state["user:preferred_language"] = "中文"
temp: 前缀的妙用
temp: 前缀在流水线里特别好用。想象一下代码审查流水线:
// 审查意见只是中间步骤,最终用户关心的是重构后的代码
// 用 temp: 前缀,这些中间数据不会污染长期的 session state
codeReviewerAgent, _ := llmagent.New(llmagent.Config{
Name: "CodeReviewerAgent",
Model: model,
Instruction: `审查代码:{generated_code}`,
OutputKey: "temp:review_comments", // 中间产物,不需要长期保存
})
// 重构师用完审查意见就完事了
codeRefactorerAgent, _ := llmagent.New(llmagent.Config{
Name: "CodeRefactorerAgent",
Model: model,
Instruction: `根据意见重构代码:
代码:{generated_code}
意见:{temp:review_comments}`,
OutputKey: "refactored_code", // 最终成果,需要保留
})
可选变量:加个问号
如果某个 State 变量可能不存在,你可以在变量名后面加 ?,让它变成可选的:
agent, _ := llmagent.New(llmagent.Config{
Name: "SmartAgent",
Model: model,
Instruction: `你是一个写作助手。
用户偏好的语言:{user:preferred_language?}
之前的草稿:{previous_draft?}
请根据用户需求写作。`,
})
如果 user:preferred_language 或 previous_draft 在 State 里不存在,不会报错,会被替换为空字符串。没有 ? 的话,找不到值就会直接报错。
引用 Artifact
除了引用 State,你还能在 Instruction 里引用 Artifact(文件):
agent, _ := llmagent.New(llmagent.Config{
Name: "DocReviewer",
Model: model,
Instruction: `审查以下文档内容:
{artifact.design_doc}
给出改进建议。`,
})
{artifact.design_doc} 会被替换为名为 design_doc 的 Artifact 的文本内容。
State 传递的完整流程
让我们画一个完整的数据流动图,把整件事串起来:
Session State
┌─────────────────────────┐
│ │
用户输入 ──→ Agent1 ──→ OutputKey:"step1_result" ──→│ step1_result = "xxx" │
│ │
Agent2 ←── Instruction 中的 ←──│ {step1_result} │
│ {step1_result} 被替换 │ │
│ │ │
↓ │ │
Agent2 ──→ OutputKey:"temp:notes" ──→│ temp:notes = "yyy" │
│ │
Agent3 ←── {step1_result} ←──│ {step1_result} │
│ {temp:notes} 被替换 │ {temp:notes} │
↓ │ │
Agent3 ──→ OutputKey:"final" ──→│ final = "zzz" │
│ │
└─────────────────────────┘
调用结束后 temp:notes 被清理
step1_result 和 final 保留
核心就三句话: 1. OutputKey 把 Agent 的输出存到 State 2. {key_name} 把 State 的值注入到 Instruction 3. 前缀决定数据的作用域和生命周期
ParallelAgent:并行执行
有时候你有几个任务互相不依赖,完全可以同时跑。比如同时搜索多个数据源、同时分析不同维度的数据。这时候就用 ParallelAgent。
基本用法
package main
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/parallelagent"
)
func main() {
// 三个分析 Agent,互不依赖
technicalAnalyst, _ := llmagent.New(llmagent.Config{
Name: "TechnicalAnalyst",
Model: model,
Instruction: `你是技术分析师。分析以下产品的技术可行性:{product_idea}`,
OutputKey: "technical_analysis",
})
marketAnalyst, _ := llmagent.New(llmagent.Config{
Name: "MarketAnalyst",
Model: model,
Instruction: `你是市场分析师。分析以下产品的市场前景:{product_idea}`,
OutputKey: "market_analysis",
})
financialAnalyst, _ := llmagent.New(llmagent.Config{
Name: "FinancialAnalyst",
Model: model,
Instruction: `你是财务分析师。评估以下产品的成本收益:{product_idea}`,
OutputKey: "financial_analysis",
})
// 并行执行三个分析
parallelAnalysis, _ := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{
Name: "ParallelAnalysis",
SubAgents: []agent.Agent{
technicalAnalyst,
marketAnalyst,
financialAnalyst,
},
},
})
// parallelAnalysis 会同时启动三个 Agent
// 三个 Agent 各自独立运行,互不干扰
// 全部完成后,state 里会有三个分析结果
}
ParallelAgent 的关键特性
1. 隔离执行
每个子 Agent 运行在自己的分支(branch)里,互相看不到对方的对话历史。这是刻意设计的,因为并行执行的 Agent 不应该互相影响。
ParallelAgent
├─ Branch: "ParallelAnalysis.TechnicalAnalyst" (独立的上下文)
├─ Branch: "ParallelAnalysis.MarketAnalyst" (独立的上下文)
└─ Branch: "ParallelAnalysis.FinancialAnalyst" (独立的上下文)
2. 真正的并发
不是"看起来并行",是 Go 协程级别的真并发。内部用 errgroup 来管理并发,所有子 Agent 同时启动。
3. 错误处理
如果任何一个子 Agent 出错,错误会被传递出来。其他还在运行的 Agent 会收到取消信号。
// 如果你有一个可能出错的 Agent
for event, err := range r.Run(ctx, userID, sessionID, userMsg, agent.RunConfig{}) {
if err != nil {
// ParallelAgent 中某个子 Agent 出错了
fmt.Printf("并行任务出错: %v\n", err)
break
}
// 处理正常的事件
}
4. 不允许自定义 Run
ParallelAgent 不允许你传入自定义的 Run 函数。并行逻辑由框架负责,你只需要定义子 Agent。
// 这样会报错!
parallelAgent, err := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{
Name: "MyParallel",
Run: myCustomRunFunc, // 不允许!
SubAgents: []agent.Agent{agent1, agent2},
},
})
// err: "ParallelAgent doesn't allow custom Run implementations"
适用场景
- 同时搜索多个数据库或 API
- 对同一个问题做多角度分析
- 同时生成多个候选方案
- 任何"把一个任务拆成互不依赖的子任务"的场景
LoopAgent:重复执行
LoopAgent 让子 Agent 反复执行,适用于迭代改进的场景。就像你写文章,先写个初稿,然后改一遍,再改一遍,越改越好。
基本用法
package main
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/loopagent"
)
func main() {
// 翻译优化 Agent
translationPolisher, _ := llmagent.New(llmagent.Config{
Name: "TranslationPolisher",
Model: model,
Instruction: `你是一个翻译润色专家。
请优化以下中文翻译,让它更地道、更自然:
{current_translation?}
如果这是第一次润色,直接翻译用户提供的英文原文。
每次润色后,输出改进后的译文。`,
OutputKey: "current_translation",
})
// 循环执行 3 次,每次都基于上一次的结果继续优化
polishLoop, _ := loopagent.New(loopagent.Config{
MaxIterations: 3,
AgentConfig: agent.Config{
Name: "TranslationPolishLoop",
SubAgents: []agent.Agent{translationPolisher},
},
})
// 第一次:翻译原文 → state["current_translation"] = 第一版
// 第二次:读取 {current_translation}(第一版),优化 → 更新 state
// 第三次:读取 {current_translation}(第二版),再次优化 → 更新 state
}
MaxIterations 的含义
loopagent.Config{
MaxIterations: 3, // 子 Agent 组整体执行 3 次
// ...
}
MaxIterations: 1等价于 SequentialAgent(执行一次)MaxIterations: 3子 Agent 序列执行 3 遍MaxIterations: 0无限循环,直到某个子 Agent 触发 Escalate
事实上,从源码可以看到,SequentialAgent 底层就是一个 MaxIterations: 1 的 LoopAgent:
// 源码中 sequentialagent.New 的实现
func New(cfg Config) (agent.Agent, error) {
sequentialAgent, err := loopagent.New(loopagent.Config{
AgentConfig: cfg.AgentConfig,
MaxIterations: 1, // 只执行一次
})
// ...
}
多个子 Agent 的循环
LoopAgent 的子 Agent 列表会在每次迭代中按顺序执行:
loopAgent, _ := loopagent.New(loopagent.Config{
MaxIterations: 2,
AgentConfig: agent.Config{
Name: "ReviewLoop",
SubAgents: []agent.Agent{drafterAgent, reviewerAgent},
},
})
// 执行顺序:
// 第 1 次迭代:drafterAgent → reviewerAgent
// 第 2 次迭代:drafterAgent → reviewerAgent
// 总共执行 4 次 Agent 调用
用 Escalate 提前退出循环
如果你想在满足某个条件时提前终止循环,可以通过工具里的 Escalate 来实现:
// 定义一个可以终止循环的工具
type CheckQualityArgs struct {
Text string `json:"text"`
}
func checkQuality(ctx tool.Context, args CheckQualityArgs) (map[string]string, error) {
// 假设经过某种检查,质量达标了
if qualityIsGood(args.Text) {
ctx.Actions().Escalate = true // 设置 Escalate,退出循环
return map[string]string{"status": "质量达标,停止迭代"}, nil
}
return map[string]string{"status": "还需要继续改进"}, nil
}
当一个子 Agent 的事件里 Escalate 为 true 时,LoopAgent 会立刻停止循环,不再执行下一轮迭代。
LoopAgent 不允许自定义 Run
和 ParallelAgent 一样,LoopAgent 也不允许传入自定义 Run 函数:
// 这样会报错
loopAgent, err := loopagent.New(loopagent.Config{
MaxIterations: 3,
AgentConfig: agent.Config{
Name: "MyLoop",
Run: myCustomFunc, // 不允许!
},
})
// err: "LoopAgent doesn't allow custom Run implementations"
三种工作流对比
一张表帮你理清什么时候用什么:
| 工作流 | 执行方式 | 适用场景 | 例子 |
|---|---|---|---|
| SequentialAgent | A -> B -> C 按顺序执行 | 有依赖关系的流水线 | 写代码 -> 审查 -> 重构 |
| ParallelAgent | A || B || C 同时执行 | 独立的并行任务 | 同时搜索多个数据源 |
| LoopAgent | A -> A -> A... 重复执行 | 迭代改进 | 反复润色翻译 |
再来看看它们的配置有什么异同:
// SequentialAgent - 最简单,只需要子 Agent 列表
sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "MySequential",
SubAgents: []agent.Agent{a, b, c},
},
})
// ParallelAgent - 也很简单,子 Agent 会同时运行
parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{
Name: "MyParallel",
SubAgents: []agent.Agent{a, b, c},
},
})
// LoopAgent - 多了一个 MaxIterations 参数
loopagent.New(loopagent.Config{
MaxIterations: 5, // 最多重复 5 次
AgentConfig: agent.Config{
Name: "MyLoop",
SubAgents: []agent.Agent{a, b},
},
})
三种配置结构几乎一样,唯一的区别是 LoopAgent 多了个 MaxIterations。这个一致性是 ADK 设计得好的地方 -- 学会一个,其他的就自然会了。
组合使用:嵌套工作流
真正强大的地方在于,这些工作流是可以嵌套的。工作流的子 Agent 可以是另一个工作流。
一个实际的例子:产品分析报告
假设你要生成一份产品分析报告,流程是这样的:
Sequential(整体流程)
|-- Agent1:收集产品信息
|-- Parallel(并行分析)
| |-- Agent2a:技术分析
| |-- Agent2b:市场分析
| +-- Agent2c:财务分析
+-- Agent3:汇总所有分析,生成最终报告
先收集信息,然后三个分析同时跑,最后汇总。代码这样写:
package main
import (
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/parallelagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
)
func main() {
// 第一步:信息收集
infoGatherer, _ := llmagent.New(llmagent.Config{
Name: "InfoGatherer",
Model: model,
Instruction: `你是产品调研员。根据用户描述的产品创意,
整理出产品的核心功能、目标用户、竞品信息。
输出结构化的产品概要。`,
OutputKey: "product_summary",
})
// 第二步的三个并行分析 Agent
techAnalyst, _ := llmagent.New(llmagent.Config{
Name: "TechAnalyst",
Model: model,
Instruction: `你是技术分析师。基于以下产品概要,
分析技术实现方案、技术风险和所需资源:
{product_summary}`,
OutputKey: "tech_report",
})
marketAnalyst, _ := llmagent.New(llmagent.Config{
Name: "MarketAnalyst",
Model: model,
Instruction: `你是市场分析师。基于以下产品概要,
分析目标市场规模、用户画像和竞争格局:
{product_summary}`,
OutputKey: "market_report",
})
financeAnalyst, _ := llmagent.New(llmagent.Config{
Name: "FinanceAnalyst",
Model: model,
Instruction: `你是财务分析师。基于以下产品概要,
估算开发成本、运营成本和预期收益:
{product_summary}`,
OutputKey: "finance_report",
})
// 把三个分析 Agent 组成并行工作流
parallelAnalysis, _ := parallelagent.New(parallelagent.Config{
AgentConfig: agent.Config{
Name: "ParallelAnalysis",
SubAgents: []agent.Agent{
techAnalyst,
marketAnalyst,
financeAnalyst,
},
},
})
// 第三步:汇总报告
reportWriter, _ := llmagent.New(llmagent.Config{
Name: "ReportWriter",
Model: model,
Instruction: `你是报告撰写专家。基于以下三份分析报告,
撰写一份完整的产品可行性分析报告:
技术分析:
{tech_report}
市场分析:
{market_report}
财务分析:
{finance_report}
报告格式:摘要 → 详细分析 → 结论与建议`,
OutputKey: "final_report",
})
// 把所有步骤串成一条流水线
fullPipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "ProductAnalysisPipeline",
SubAgents: []agent.Agent{
infoGatherer, // 第 1 步:收集信息(顺序)
parallelAnalysis, // 第 2 步:三个分析同时跑(并行)
reportWriter, // 第 3 步:汇总报告(顺序)
},
},
})
// fullPipeline 就是你的完整工作流了
// 用 runner 运行它就行
}
更复杂的嵌套
你甚至可以把 Loop 也嵌进来:
Sequential(整体流程)
|-- Agent1:写初稿
|-- Loop(迭代改进,循环 3 次)
| |-- Agent2:审查并打分
| +-- Agent3:根据审查意见修改
+-- Agent4:最终润色
// 审查 Agent
reviewer, _ := llmagent.New(llmagent.Config{
Name: "Reviewer",
Model: model,
Instruction: `审查以下文章并给出改进意见:
{current_draft}
打分 1-10,并列出需要改进的地方。`,
OutputKey: "temp:review_feedback",
})
// 修改 Agent
editor, _ := llmagent.New(llmagent.Config{
Name: "Editor",
Model: model,
Instruction: `根据审查意见修改文章:
当前版本:{current_draft}
审查意见:{temp:review_feedback}
输出修改后的版本。`,
OutputKey: "current_draft", // 覆盖之前的版本,形成迭代
})
// 把审查和修改组成循环
reviewLoop, _ := loopagent.New(loopagent.Config{
MaxIterations: 3,
AgentConfig: agent.Config{
Name: "ReviewLoop",
SubAgents: []agent.Agent{reviewer, editor},
},
})
// 嵌入到整体流水线
pipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "WritingPipeline",
SubAgents: []agent.Agent{
drafterAgent, // 写初稿 → state["current_draft"]
reviewLoop, // 循环审查修改 3 次
polisherAgent, // 最终润色
},
},
})
这里有个巧妙的设计:editor 的 OutputKey 和 drafterAgent 的 OutputKey 都是 "current_draft"。这样每次循环,editor 都会覆盖 current_draft 的值,而 reviewer 下一轮读到的就是更新后的版本。形成了一个自然的迭代链。
自定义Agent在工作流中
前面几章我们学过用 agent.New 创建自定义 Agent。自定义 Agent 可以完美地嵌入到工作流中。
基本用法
package main
import (
"fmt"
"iter"
"google.golang.org/genai"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
"google.golang.org/adk/model"
"google.golang.org/adk/session"
)
// 自定义数据预处理 Agent
func dataPreprocessor(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// 从用户输入中提取信息
userContent := ctx.UserContent()
var inputText string
if userContent != nil {
for _, part := range userContent.Parts {
if part.Text != "" {
inputText = part.Text
}
}
}
// 做一些预处理
processedData := fmt.Sprintf("预处理后的数据: [%s] (已清洗、已标准化)", inputText)
// 把处理后的数据存到 state
if err := ctx.Session().State().Set("processed_input", processedData); err != nil {
yield(nil, fmt.Errorf("保存预处理结果失败: %w", err))
return
}
// 产生一个事件
event := session.NewEvent(ctx.InvocationID())
event.LLMResponse = model.LLMResponse{
Content: genai.NewContentFromText("数据预处理完成", genai.RoleModel),
}
event.Actions = session.EventActions{
StateDelta: map[string]any{
"processed_input": processedData,
},
}
yield(event, nil)
}
}
func main() {
// 自定义 Agent
preprocessor, _ := agent.New(agent.Config{
Name: "DataPreprocessor",
Run: dataPreprocessor,
})
// LLM Agent 使用预处理后的数据
analyzer, _ := llmagent.New(llmagent.Config{
Name: "DataAnalyzer",
Model: model,
Instruction: `你是数据分析师。分析以下数据:
{processed_input}
输出分析报告。`,
OutputKey: "analysis_result",
})
// 组合到流水线中
pipeline, _ := sequentialagent.New(sequentialagent.Config{
AgentConfig: agent.Config{
Name: "AnalysisPipeline",
SubAgents: []agent.Agent{
preprocessor, // 自定义 Agent 做预处理
analyzer, // LLM Agent 做分析
},
},
})
// 自定义 Agent 和 LLM Agent 可以自由混搭
}
自定义 Agent 写入 State 的注意点
自定义 Agent 没有 OutputKey 字段(那是 LLMAgent 的功能),所以你需要手动管理 State:
func myCustomRun(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// 方法 1:直接写 session state
ctx.Session().State().Set("my_key", "my_value")
// 方法 2:通过 event 的 StateDelta(推荐)
// StateDelta 会被框架自动合并到 session state
event := session.NewEvent(ctx.InvocationID())
event.LLMResponse = model.LLMResponse{
Content: genai.NewContentFromText("处理完成", genai.RoleModel),
}
event.Actions = session.EventActions{
StateDelta: map[string]any{
"my_key": "my_value",
"temp:my_temp": "临时数据",
},
}
yield(event, nil)
}
}
推荐用 StateDelta 的方式,因为它和 ADK 的事件系统集成得更好,框架会自动处理 State 的更新和持久化。
自定义 Agent 读取 State
func myCustomRun(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
return func(yield func(*session.Event, error) bool) {
// 读取前面 Agent 写入的 state 值
val, err := ctx.Session().State().Get("generated_code")
if err != nil {
yield(nil, fmt.Errorf("读取 state 失败: %w", err))
return
}
code, ok := val.(string)
if !ok {
yield(nil, fmt.Errorf("state 值类型错误"))
return
}
// 用读到的数据做你的自定义逻辑
fmt.Printf("拿到了代码: %s\n", code)
// ... 继续处理
}
}
小结
这一章我们学了三种工作流 Agent,来回顾一下核心要点:
三种工作流
- SequentialAgent:按顺序执行子 Agent,适合有依赖关系的流水线
- ParallelAgent:并行执行子 Agent,适合互不依赖的并行任务
- LoopAgent:循环执行子 Agent,适合迭代改进的场景
数据传递机制
OutputKey:让 LLMAgent 的输出自动存到 Session State{key_name}:在 Instruction 里引用 State 中的值{key_name?}:可选引用,找不到值不报错
State 前缀
- 无前缀:普通会话级数据
temp::临时数据,仅当前调用有效app::应用级共享,跨用户跨会话user::用户级共享,同一用户的所有会话
组合嵌套
- 工作流可以互相嵌套,组合出复杂的执行流程
- 自定义 Agent 可以无缝接入任何工作流
底层关系
- SequentialAgent 底层就是
MaxIterations: 1的 LoopAgent - 三种工作流的配置结构高度一致,学习成本很低
动手练习:博客文章生产线
来实操一下!创建一个"博客文章生产线",实现以下流程:
Sequential(博客生产流水线)
|-- ResearchAgent:研究主题,整理要点
|-- WriterAgent:根据研究结果写初稿
|-- Loop(审查改进循环,2 次迭代)
| |-- ReviewerAgent:审查文章质量
| +-- EditorAgent:根据审查意见修改
+-- PolisherAgent:最终润色,输出成品
骨架代码
下面是一个骨架,把 // TODO 的部分补全就行:
package main
import (
"context"
"fmt"
"google.golang.org/genai"
"google.golang.org/adk/agent"
"google.golang.org/adk/agent/llmagent"
"google.golang.org/adk/agent/workflowagents/loopagent"
"google.golang.org/adk/agent/workflowagents/sequentialagent"
"google.golang.org/adk/runner"
"google.golang.org/adk/session"
)
func main() {
ctx := context.Background()
// TODO: 创建你的 model
// model := ...
// 第一步:研究员
researcher, _ := llmagent.New(llmagent.Config{
Name: "Researcher",
Model: model,
Instruction: `你是一个主题研究员。根据用户给出的博客主题,
研究并整理出 5-8 个关键要点,包括:
- 核心概念解释
- 实际应用案例
- 常见误区
输出结构化的研究笔记。`,
OutputKey: "research_notes",
})
// 第二步:写手
// TODO: 创建 WriterAgent
// - 读取 {research_notes}
// - 写初稿
// - OutputKey: "current_draft"
// 第三步的子 Agent:审查员
// TODO: 创建 ReviewerAgent
// - 读取 {current_draft}
// - 审查并给出意见
// - OutputKey: "temp:review_feedback"(临时数据)
// 第三步的子 Agent:编辑
// TODO: 创建 EditorAgent
// - 读取 {current_draft} 和 {temp:review_feedback}
// - 输出修改后的版本
// - OutputKey: "current_draft"(覆盖原来的草稿)
// 第三步:审查-修改循环
// TODO: 用 loopagent.New 创建循环,MaxIterations: 2
// 子 Agent:ReviewerAgent 和 EditorAgent
// 第四步:润色师
// TODO: 创建 PolisherAgent
// - 读取 {current_draft}
// - 最终润色
// - OutputKey: "final_article"
// TODO: 用 sequentialagent.New 把所有步骤串起来
// 运行流水线
sessionService := session.InMemoryService()
r, _ := runner.New(runner.Config{
AppName: "blog_pipeline",
Agent: pipeline, // 你创建的流水线
SessionService: sessionService,
})
createResp, _ := sessionService.Create(ctx, &session.CreateRequest{
AppName: "blog_pipeline",
UserID: "blogger",
})
userMsg := genai.NewContentFromText(
"写一篇关于 Go 语言 Channel 的博客文章",
genai.RoleUser,
)
for event, err := range r.Run(ctx, "blogger", createResp.Session.ID(), userMsg, agent.RunConfig{}) {
if err != nil {
fmt.Printf("错误: %v\n", err)
continue
}
if event != nil && event.Content != nil {
for _, part := range event.Content.Parts {
if part.Text != "" {
fmt.Printf("\n--- [%s] ---\n%s\n", event.Author, part.Text)
}
}
}
}
}
提示
- Writer 的 Instruction 里要用
{research_notes}引用研究笔记 - Editor 的
OutputKey要设成"current_draft",这样每次循环都会更新草稿 - Reviewer 的
OutputKey用"temp:review_feedback",因为审查意见是中间产物 - 注意 Agent 的名字不能重复,在整个 Agent 树里必须唯一
进阶挑战
如果觉得太简单,试试这些:
- 在循环中加入一个自定义 Agent,用来检查文章字数,如果超过 2000 字就设置
Escalate提前退出循环 - 把润色步骤改成 ParallelAgent,同时做"语法检查"和"风格优化",然后再加一个汇总 Agent
- 给整个流水线加上
BeforeAgentCallbacks,在执行前打印一条日志
祝你写得愉快。下一章我们会学习更高级的 Agent 协作模式 -- Agent 之间的动态转移。