第四章:工作流Agent - 让Agent按流程办事

本章你将学到

前面几章我们学了 LLMAgent,它很强大,但有一个特点:它让 LLM 来决定下一步做什么。这就好比你请了一个很聪明的助手,你告诉他目标,他自己决定怎么干。

但有时候,我们不想让 LLM 自由发挥。我们希望严格按照我们设定的流程来执行,一步一步地走。就像工厂的流水线,每个工位做什么、顺序是什么,都是你提前定好的。

这就是工作流 Agent(Workflow Agent)要干的事。

读完这一章,你会掌握:

  • 工作流 Agent 和普通 LLMAgent 的核心区别
  • 三种工作流 Agent:Sequential(顺序)、Parallel(并行)、Loop(循环)
  • OutputKey 和 State 传递:Agent 之间如何传数据(重点中的重点)
  • State 的前缀机制:temp:app:user: 分别是啥意思
  • 如何组合嵌套多种工作流
  • 如何把自定义 Agent 放进工作流里

工作流Agent vs 普通Agent

先来聊聊它们的本质区别,这个理解了,后面的内容就非常自然。

LLMAgent:LLM 做主

用户输入 → LLM 思考 → 决定调什么工具 → 看结果 → 再思考 → 输出回复

LLMAgent 的核心是:LLM 说了算。你给它工具和指令,它自己决定用哪个工具、用什么顺序、要不要循环。这很灵活,但也意味着你没法完全控制执行流程。

工作流Agent:你做主

你定义流程 → Agent 按流程一步步执行 → 你完全掌控顺序

工作流 Agent 的核心是:你说了算。你明确地告诉系统:先执行 A,再执行 B,最后执行 C。不管 LLM 怎么想,流程就是这个流程。

打个比方:

  • LLMAgent 像是给了实习生一个任务,让他自由发挥
  • 工作流Agent 像是给了流水线工人一份 SOP,每一步都写得清清楚楚

什么时候用哪个?看场景:

场景 选择 原因
聊天机器人 LLMAgent 需要灵活应对各种问题
代码审查流水线 SequentialAgent 写代码 → 审查 → 重构,顺序固定
多数据源搜索 ParallelAgent 几个搜索互不相干,并行跑更快
文章反复润色 LoopAgent 同一个任务重复做,越做越好

好,概念明白了,我们来看具体怎么用。


SequentialAgent:流水线

SequentialAgent 是最直观的工作流 Agent,就像工厂流水线一样:Agent 按照你排列的顺序,一个接一个地执行。

基本用法

先看一个最简单的例子:

package main

import (
    "context"
    "fmt"

    "google.golang.org/genai"

    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
    "google.golang.org/adk/runner"
    "google.golang.org/adk/session"
)

func main() {
    ctx := context.Background()

    // 假设你已经有了一个 model(LLM 实例)
    // model := ... 具体创建方式见前面章节

    // 第一步:代码写手
    codeWriterAgent, _ := llmagent.New(llmagent.Config{
        Name:  "CodeWriterAgent",
        Model: model,
        Instruction: `你是Python代码生成器。根据用户需求写代码。只输出代码块。`,
        OutputKey: "generated_code", // 关键!输出存到 state 里
    })

    // 第二步:代码审查员
    codeReviewerAgent, _ := llmagent.New(llmagent.Config{
        Name:  "CodeReviewerAgent",
        Model: model,
        Instruction: `你是代码审查专家。审查以下代码:
'''python
{generated_code}
'''
给出改进建议。`,
        OutputKey: "temp:review_comments", // temp: 前缀表示临时数据
    })

    // 第三步:代码重构师
    codeRefactorerAgent, _ := llmagent.New(llmagent.Config{
        Name:  "CodeRefactorerAgent",
        Model: model,
        Instruction: `根据审查意见重构代码:
原始代码:{generated_code}
审查意见:{temp:review_comments}
输出重构后的代码。`,
        OutputKey: "refactored_code",
    })

    // 把三个 Agent 串成流水线
    pipeline, _ := sequentialagent.New(sequentialagent.Config{
        AgentConfig: agent.Config{
            Name: "CodePipeline",
            SubAgents: []agent.Agent{
                codeWriterAgent,     // 第一个执行
                codeReviewerAgent,   // 第二个执行
                codeRefactorerAgent, // 第三个执行
            },
        },
    })

    // 创建 session 和 runner
    sessionService := session.InMemoryService()
    r, _ := runner.New(runner.Config{
        AppName:        "code_pipeline_app",
        Agent:          pipeline,
        SessionService: sessionService,
    })

    // 创建会话
    createResp, _ := sessionService.Create(ctx, &session.CreateRequest{
        AppName: "code_pipeline_app",
        UserID:  "developer",
    })
    sessionID := createResp.Session.ID()

    // 运行流水线
    userMsg := genai.NewContentFromText("写一个冒泡排序算法", genai.RoleUser)
    for event, err := range r.Run(ctx, "developer", sessionID, userMsg, agent.RunConfig{}) {
        if err != nil {
            fmt.Printf("出错了: %v\n", err)
            continue
        }
        if event != nil && event.Content != nil {
            for _, part := range event.Content.Parts {
                if part.Text != "" {
                    fmt.Printf("[%s] %s\n", event.Author, part.Text)
                }
            }
        }
    }
}

执行顺序是这样的:

用户: "写一个冒泡排序算法"
  ↓
CodeWriterAgent: 写出冒泡排序代码 → 存到 state["generated_code"]
  ↓
CodeReviewerAgent: 读取 {generated_code},审查代码 → 存到 state["temp:review_comments"]
  ↓
CodeRefactorerAgent: 读取 {generated_code} 和 {temp:review_comments},输出重构后的代码
  ↓
完成!

看到没?Agent 之间是通过 state(状态)来传递数据的。这就引出了本章最重要的概念。


OutputKey 和 State 传递

这是工作流 Agent 里最核心的概念,搞懂了它,你才能真正串联起多个 Agent。

OutputKey 是什么?

OutputKeyllmagent.Config 的一个字段。当一个 Agent 产生输出时,如果设置了 OutputKey,ADK 会自动把这个输出存到 Session State 里。

agent, _ := llmagent.New(llmagent.Config{
    Name:      "WriterAgent",
    Model:     model,
    Instruction: "写一首关于Go语言的诗",
    OutputKey: "poem",  // Agent 的输出会自动存到 state["poem"]
})

当这个 Agent 运行完毕,假设它输出了"Go语言如清风...",那么 Session State 里就会有:

state["poem"] = "Go语言如清风..."

在 Instruction 中引用 State

其他 Agent 可以在 Instruction 里用 {key_name} 的语法来引用 State 中的值:

reviewerAgent, _ := llmagent.New(llmagent.Config{
    Name:  "ReviewerAgent",
    Model: model,
    Instruction: `请评价以下诗歌的质量:
{poem}
从意境、韵律、创意三个维度打分。`,
})

ADK 在运行时会自动把 {poem} 替换成 State 里存的值。所以实际发给 LLM 的指令会变成:

请评价以下诗歌的质量:
Go语言如清风...
从意境、韵律、创意三个维度打分。

这就是 Agent 之间传递数据的秘密。

State 的前缀机制

State 的 key 可以带前缀,不同前缀决定了数据的作用域生命周期

前缀 格式 作用域 生命周期 典型用途
无前缀 "my_key" 当前会话 会话期间持久化 需要在多次调用中复用的数据
temp: "temp:my_key" 当前调用 仅本次调用期间 中间结果,用完就扔
app: "app:my_key" 整个应用 跨用户、跨会话共享 全局配置、共享知识
user: "user:my_key" 当前用户 同一用户的所有会话共享 用户偏好、历史记录

来看几个例子,帮你理解这些前缀:

// 1. 无前缀 - 普通的会话级数据
// 这个数据会在当前会话中一直存在
writerAgent, _ := llmagent.New(llmagent.Config{
    Name:      "WriterAgent",
    Model:     model,
    Instruction: "写一篇短文",
    OutputKey: "article",  // state["article"] = "..."
})

// 2. temp: 前缀 - 临时数据,用完就扔
// 比如审查意见,最终用户不需要看到,只是中间步骤
reviewerAgent, _ := llmagent.New(llmagent.Config{
    Name:      "ReviewerAgent",
    Model:     model,
    Instruction: `审查这篇文章:{article}`,
    OutputKey: "temp:review_notes",  // 临时数据,调用结束后不保留
})

// 3. app: 前缀 - 应用级共享数据
// 所有用户都能读到,比如公告、全局设定
// 通常在初始化 session 时设置,或通过工具设置
// state["app:writing_style"] = "正式、专业"

// 4. user: 前缀 - 用户级共享数据
// 同一用户的不同会话都能读到
// state["user:preferred_language"] = "中文"

temp: 前缀的妙用

temp: 前缀在流水线里特别好用。想象一下代码审查流水线:

// 审查意见只是中间步骤,最终用户关心的是重构后的代码
// 用 temp: 前缀,这些中间数据不会污染长期的 session state
codeReviewerAgent, _ := llmagent.New(llmagent.Config{
    Name:  "CodeReviewerAgent",
    Model: model,
    Instruction: `审查代码:{generated_code}`,
    OutputKey: "temp:review_comments",  // 中间产物,不需要长期保存
})

// 重构师用完审查意见就完事了
codeRefactorerAgent, _ := llmagent.New(llmagent.Config{
    Name:  "CodeRefactorerAgent",
    Model: model,
    Instruction: `根据意见重构代码:
代码:{generated_code}
意见:{temp:review_comments}`,
    OutputKey: "refactored_code",  // 最终成果,需要保留
})

可选变量:加个问号

如果某个 State 变量可能不存在,你可以在变量名后面加 ?,让它变成可选的:

agent, _ := llmagent.New(llmagent.Config{
    Name:  "SmartAgent",
    Model: model,
    Instruction: `你是一个写作助手。
用户偏好的语言:{user:preferred_language?}
之前的草稿:{previous_draft?}
请根据用户需求写作。`,
})

如果 user:preferred_languageprevious_draft 在 State 里不存在,不会报错,会被替换为空字符串。没有 ? 的话,找不到值就会直接报错。

引用 Artifact

除了引用 State,你还能在 Instruction 里引用 Artifact(文件):

agent, _ := llmagent.New(llmagent.Config{
    Name:  "DocReviewer",
    Model: model,
    Instruction: `审查以下文档内容:
{artifact.design_doc}
给出改进建议。`,
})

{artifact.design_doc} 会被替换为名为 design_doc 的 Artifact 的文本内容。

State 传递的完整流程

让我们画一个完整的数据流动图,把整件事串起来:

                    Session State
                    ┌─────────────────────────┐
                    │                         │
用户输入 ──→ Agent1 ──→ OutputKey:"step1_result" ──→│ step1_result = "xxx" │
                    │                         │
            Agent2 ←── Instruction 中的      ←──│ {step1_result}       │
            │          {step1_result} 被替换     │                     │
            │                                   │                     │
            ↓                                   │                     │
            Agent2 ──→ OutputKey:"temp:notes" ──→│ temp:notes = "yyy"  │
                    │                         │
            Agent3 ←── {step1_result}         ←──│ {step1_result}     │
            │          {temp:notes} 被替换       │ {temp:notes}       │
            ↓                                   │                     │
            Agent3 ──→ OutputKey:"final"      ──→│ final = "zzz"      │
                    │                         │
                    └─────────────────────────┘
                    调用结束后 temp:notes 被清理
                    step1_result 和 final 保留

核心就三句话: 1. OutputKey 把 Agent 的输出存到 State 2. {key_name} 把 State 的值注入到 Instruction 3. 前缀决定数据的作用域和生命周期


ParallelAgent:并行执行

有时候你有几个任务互相不依赖,完全可以同时跑。比如同时搜索多个数据源、同时分析不同维度的数据。这时候就用 ParallelAgent。

基本用法

package main

import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/parallelagent"
)

func main() {
    // 三个分析 Agent,互不依赖
    technicalAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "TechnicalAnalyst",
        Model: model,
        Instruction: `你是技术分析师。分析以下产品的技术可行性:{product_idea}`,
        OutputKey: "technical_analysis",
    })

    marketAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "MarketAnalyst",
        Model: model,
        Instruction: `你是市场分析师。分析以下产品的市场前景:{product_idea}`,
        OutputKey: "market_analysis",
    })

    financialAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "FinancialAnalyst",
        Model: model,
        Instruction: `你是财务分析师。评估以下产品的成本收益:{product_idea}`,
        OutputKey: "financial_analysis",
    })

    // 并行执行三个分析
    parallelAnalysis, _ := parallelagent.New(parallelagent.Config{
        AgentConfig: agent.Config{
            Name: "ParallelAnalysis",
            SubAgents: []agent.Agent{
                technicalAnalyst,
                marketAnalyst,
                financialAnalyst,
            },
        },
    })

    // parallelAnalysis 会同时启动三个 Agent
    // 三个 Agent 各自独立运行,互不干扰
    // 全部完成后,state 里会有三个分析结果
}

ParallelAgent 的关键特性

1. 隔离执行

每个子 Agent 运行在自己的分支(branch)里,互相看不到对方的对话历史。这是刻意设计的,因为并行执行的 Agent 不应该互相影响。

ParallelAgent
  ├─ Branch: "ParallelAnalysis.TechnicalAnalyst"  (独立的上下文)
  ├─ Branch: "ParallelAnalysis.MarketAnalyst"      (独立的上下文)
  └─ Branch: "ParallelAnalysis.FinancialAnalyst"   (独立的上下文)

2. 真正的并发

不是"看起来并行",是 Go 协程级别的真并发。内部用 errgroup 来管理并发,所有子 Agent 同时启动。

3. 错误处理

如果任何一个子 Agent 出错,错误会被传递出来。其他还在运行的 Agent 会收到取消信号。

// 如果你有一个可能出错的 Agent
for event, err := range r.Run(ctx, userID, sessionID, userMsg, agent.RunConfig{}) {
    if err != nil {
        // ParallelAgent 中某个子 Agent 出错了
        fmt.Printf("并行任务出错: %v\n", err)
        break
    }
    // 处理正常的事件
}

4. 不允许自定义 Run

ParallelAgent 不允许你传入自定义的 Run 函数。并行逻辑由框架负责,你只需要定义子 Agent。

// 这样会报错!
parallelAgent, err := parallelagent.New(parallelagent.Config{
    AgentConfig: agent.Config{
        Name: "MyParallel",
        Run:  myCustomRunFunc,  // 不允许!
        SubAgents: []agent.Agent{agent1, agent2},
    },
})
// err: "ParallelAgent doesn't allow custom Run implementations"

适用场景

  • 同时搜索多个数据库或 API
  • 对同一个问题做多角度分析
  • 同时生成多个候选方案
  • 任何"把一个任务拆成互不依赖的子任务"的场景

LoopAgent:重复执行

LoopAgent 让子 Agent 反复执行,适用于迭代改进的场景。就像你写文章,先写个初稿,然后改一遍,再改一遍,越改越好。

基本用法

package main

import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/loopagent"
)

func main() {
    // 翻译优化 Agent
    translationPolisher, _ := llmagent.New(llmagent.Config{
        Name:  "TranslationPolisher",
        Model: model,
        Instruction: `你是一个翻译润色专家。
请优化以下中文翻译,让它更地道、更自然:
{current_translation?}

如果这是第一次润色,直接翻译用户提供的英文原文。
每次润色后,输出改进后的译文。`,
        OutputKey: "current_translation",
    })

    // 循环执行 3 次,每次都基于上一次的结果继续优化
    polishLoop, _ := loopagent.New(loopagent.Config{
        MaxIterations: 3,
        AgentConfig: agent.Config{
            Name:      "TranslationPolishLoop",
            SubAgents: []agent.Agent{translationPolisher},
        },
    })

    // 第一次:翻译原文 → state["current_translation"] = 第一版
    // 第二次:读取 {current_translation}(第一版),优化 → 更新 state
    // 第三次:读取 {current_translation}(第二版),再次优化 → 更新 state
}

MaxIterations 的含义

loopagent.Config{
    MaxIterations: 3,  // 子 Agent 组整体执行 3 次
    // ...
}
  • MaxIterations: 1 等价于 SequentialAgent(执行一次)
  • MaxIterations: 3 子 Agent 序列执行 3 遍
  • MaxIterations: 0 无限循环,直到某个子 Agent 触发 Escalate

事实上,从源码可以看到,SequentialAgent 底层就是一个 MaxIterations: 1 的 LoopAgent

// 源码中 sequentialagent.New 的实现
func New(cfg Config) (agent.Agent, error) {
    sequentialAgent, err := loopagent.New(loopagent.Config{
        AgentConfig:   cfg.AgentConfig,
        MaxIterations: 1,  // 只执行一次
    })
    // ...
}

多个子 Agent 的循环

LoopAgent 的子 Agent 列表会在每次迭代中按顺序执行:

loopAgent, _ := loopagent.New(loopagent.Config{
    MaxIterations: 2,
    AgentConfig: agent.Config{
        Name: "ReviewLoop",
        SubAgents: []agent.Agent{drafterAgent, reviewerAgent},
    },
})

// 执行顺序:
// 第 1 次迭代:drafterAgent → reviewerAgent
// 第 2 次迭代:drafterAgent → reviewerAgent
// 总共执行 4 次 Agent 调用

用 Escalate 提前退出循环

如果你想在满足某个条件时提前终止循环,可以通过工具里的 Escalate 来实现:

// 定义一个可以终止循环的工具
type CheckQualityArgs struct {
    Text string `json:"text"`
}

func checkQuality(ctx tool.Context, args CheckQualityArgs) (map[string]string, error) {
    // 假设经过某种检查,质量达标了
    if qualityIsGood(args.Text) {
        ctx.Actions().Escalate = true  // 设置 Escalate,退出循环
        return map[string]string{"status": "质量达标,停止迭代"}, nil
    }
    return map[string]string{"status": "还需要继续改进"}, nil
}

当一个子 Agent 的事件里 Escalatetrue 时,LoopAgent 会立刻停止循环,不再执行下一轮迭代。

LoopAgent 不允许自定义 Run

和 ParallelAgent 一样,LoopAgent 也不允许传入自定义 Run 函数:

// 这样会报错
loopAgent, err := loopagent.New(loopagent.Config{
    MaxIterations: 3,
    AgentConfig: agent.Config{
        Name: "MyLoop",
        Run:  myCustomFunc,  // 不允许!
    },
})
// err: "LoopAgent doesn't allow custom Run implementations"

三种工作流对比

一张表帮你理清什么时候用什么:

工作流 执行方式 适用场景 例子
SequentialAgent A -> B -> C 按顺序执行 有依赖关系的流水线 写代码 -> 审查 -> 重构
ParallelAgent A || B || C 同时执行 独立的并行任务 同时搜索多个数据源
LoopAgent A -> A -> A... 重复执行 迭代改进 反复润色翻译

再来看看它们的配置有什么异同:

// SequentialAgent - 最简单,只需要子 Agent 列表
sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{
        Name:      "MySequential",
        SubAgents: []agent.Agent{a, b, c},
    },
})

// ParallelAgent - 也很简单,子 Agent 会同时运行
parallelagent.New(parallelagent.Config{
    AgentConfig: agent.Config{
        Name:      "MyParallel",
        SubAgents: []agent.Agent{a, b, c},
    },
})

// LoopAgent - 多了一个 MaxIterations 参数
loopagent.New(loopagent.Config{
    MaxIterations: 5,  // 最多重复 5 次
    AgentConfig: agent.Config{
        Name:      "MyLoop",
        SubAgents: []agent.Agent{a, b},
    },
})

三种配置结构几乎一样,唯一的区别是 LoopAgent 多了个 MaxIterations。这个一致性是 ADK 设计得好的地方 -- 学会一个,其他的就自然会了。


组合使用:嵌套工作流

真正强大的地方在于,这些工作流是可以嵌套的。工作流的子 Agent 可以是另一个工作流。

一个实际的例子:产品分析报告

假设你要生成一份产品分析报告,流程是这样的:

Sequential(整体流程)
  |-- Agent1:收集产品信息
  |-- Parallel(并行分析)
  |     |-- Agent2a:技术分析
  |     |-- Agent2b:市场分析
  |     +-- Agent2c:财务分析
  +-- Agent3:汇总所有分析,生成最终报告

先收集信息,然后三个分析同时跑,最后汇总。代码这样写:

package main

import (
    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/parallelagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
)

func main() {
    // 第一步:信息收集
    infoGatherer, _ := llmagent.New(llmagent.Config{
        Name:  "InfoGatherer",
        Model: model,
        Instruction: `你是产品调研员。根据用户描述的产品创意,
整理出产品的核心功能、目标用户、竞品信息。
输出结构化的产品概要。`,
        OutputKey: "product_summary",
    })

    // 第二步的三个并行分析 Agent
    techAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "TechAnalyst",
        Model: model,
        Instruction: `你是技术分析师。基于以下产品概要,
分析技术实现方案、技术风险和所需资源:
{product_summary}`,
        OutputKey: "tech_report",
    })

    marketAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "MarketAnalyst",
        Model: model,
        Instruction: `你是市场分析师。基于以下产品概要,
分析目标市场规模、用户画像和竞争格局:
{product_summary}`,
        OutputKey: "market_report",
    })

    financeAnalyst, _ := llmagent.New(llmagent.Config{
        Name:  "FinanceAnalyst",
        Model: model,
        Instruction: `你是财务分析师。基于以下产品概要,
估算开发成本、运营成本和预期收益:
{product_summary}`,
        OutputKey: "finance_report",
    })

    // 把三个分析 Agent 组成并行工作流
    parallelAnalysis, _ := parallelagent.New(parallelagent.Config{
        AgentConfig: agent.Config{
            Name: "ParallelAnalysis",
            SubAgents: []agent.Agent{
                techAnalyst,
                marketAnalyst,
                financeAnalyst,
            },
        },
    })

    // 第三步:汇总报告
    reportWriter, _ := llmagent.New(llmagent.Config{
        Name:  "ReportWriter",
        Model: model,
        Instruction: `你是报告撰写专家。基于以下三份分析报告,
撰写一份完整的产品可行性分析报告:

技术分析:
{tech_report}

市场分析:
{market_report}

财务分析:
{finance_report}

报告格式:摘要 → 详细分析 → 结论与建议`,
        OutputKey: "final_report",
    })

    // 把所有步骤串成一条流水线
    fullPipeline, _ := sequentialagent.New(sequentialagent.Config{
        AgentConfig: agent.Config{
            Name: "ProductAnalysisPipeline",
            SubAgents: []agent.Agent{
                infoGatherer,      // 第 1 步:收集信息(顺序)
                parallelAnalysis,  // 第 2 步:三个分析同时跑(并行)
                reportWriter,      // 第 3 步:汇总报告(顺序)
            },
        },
    })

    // fullPipeline 就是你的完整工作流了
    // 用 runner 运行它就行
}

更复杂的嵌套

你甚至可以把 Loop 也嵌进来:

Sequential(整体流程)
  |-- Agent1:写初稿
  |-- Loop(迭代改进,循环 3 次)
  |     |-- Agent2:审查并打分
  |     +-- Agent3:根据审查意见修改
  +-- Agent4:最终润色
// 审查 Agent
reviewer, _ := llmagent.New(llmagent.Config{
    Name:  "Reviewer",
    Model: model,
    Instruction: `审查以下文章并给出改进意见:
{current_draft}
打分 1-10,并列出需要改进的地方。`,
    OutputKey: "temp:review_feedback",
})

// 修改 Agent
editor, _ := llmagent.New(llmagent.Config{
    Name:  "Editor",
    Model: model,
    Instruction: `根据审查意见修改文章:
当前版本:{current_draft}
审查意见:{temp:review_feedback}
输出修改后的版本。`,
    OutputKey: "current_draft",  // 覆盖之前的版本,形成迭代
})

// 把审查和修改组成循环
reviewLoop, _ := loopagent.New(loopagent.Config{
    MaxIterations: 3,
    AgentConfig: agent.Config{
        Name:      "ReviewLoop",
        SubAgents: []agent.Agent{reviewer, editor},
    },
})

// 嵌入到整体流水线
pipeline, _ := sequentialagent.New(sequentialagent.Config{
    AgentConfig: agent.Config{
        Name:      "WritingPipeline",
        SubAgents: []agent.Agent{
            drafterAgent,  // 写初稿 → state["current_draft"]
            reviewLoop,    // 循环审查修改 3 次
            polisherAgent, // 最终润色
        },
    },
})

这里有个巧妙的设计:editorOutputKeydrafterAgentOutputKey 都是 "current_draft"。这样每次循环,editor 都会覆盖 current_draft 的值,而 reviewer 下一轮读到的就是更新后的版本。形成了一个自然的迭代链。


自定义Agent在工作流中

前面几章我们学过用 agent.New 创建自定义 Agent。自定义 Agent 可以完美地嵌入到工作流中。

基本用法

package main

import (
    "fmt"
    "iter"

    "google.golang.org/genai"

    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
    "google.golang.org/adk/model"
    "google.golang.org/adk/session"
)

// 自定义数据预处理 Agent
func dataPreprocessor(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
    return func(yield func(*session.Event, error) bool) {
        // 从用户输入中提取信息
        userContent := ctx.UserContent()
        var inputText string
        if userContent != nil {
            for _, part := range userContent.Parts {
                if part.Text != "" {
                    inputText = part.Text
                }
            }
        }

        // 做一些预处理
        processedData := fmt.Sprintf("预处理后的数据: [%s] (已清洗、已标准化)", inputText)

        // 把处理后的数据存到 state
        if err := ctx.Session().State().Set("processed_input", processedData); err != nil {
            yield(nil, fmt.Errorf("保存预处理结果失败: %w", err))
            return
        }

        // 产生一个事件
        event := session.NewEvent(ctx.InvocationID())
        event.LLMResponse = model.LLMResponse{
            Content: genai.NewContentFromText("数据预处理完成", genai.RoleModel),
        }
        event.Actions = session.EventActions{
            StateDelta: map[string]any{
                "processed_input": processedData,
            },
        }

        yield(event, nil)
    }
}

func main() {
    // 自定义 Agent
    preprocessor, _ := agent.New(agent.Config{
        Name: "DataPreprocessor",
        Run:  dataPreprocessor,
    })

    // LLM Agent 使用预处理后的数据
    analyzer, _ := llmagent.New(llmagent.Config{
        Name:  "DataAnalyzer",
        Model: model,
        Instruction: `你是数据分析师。分析以下数据:
{processed_input}
输出分析报告。`,
        OutputKey: "analysis_result",
    })

    // 组合到流水线中
    pipeline, _ := sequentialagent.New(sequentialagent.Config{
        AgentConfig: agent.Config{
            Name: "AnalysisPipeline",
            SubAgents: []agent.Agent{
                preprocessor, // 自定义 Agent 做预处理
                analyzer,     // LLM Agent 做分析
            },
        },
    })

    // 自定义 Agent 和 LLM Agent 可以自由混搭
}

自定义 Agent 写入 State 的注意点

自定义 Agent 没有 OutputKey 字段(那是 LLMAgent 的功能),所以你需要手动管理 State:

func myCustomRun(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
    return func(yield func(*session.Event, error) bool) {
        // 方法 1:直接写 session state
        ctx.Session().State().Set("my_key", "my_value")

        // 方法 2:通过 event 的 StateDelta(推荐)
        // StateDelta 会被框架自动合并到 session state
        event := session.NewEvent(ctx.InvocationID())
        event.LLMResponse = model.LLMResponse{
            Content: genai.NewContentFromText("处理完成", genai.RoleModel),
        }
        event.Actions = session.EventActions{
            StateDelta: map[string]any{
                "my_key":       "my_value",
                "temp:my_temp": "临时数据",
            },
        }

        yield(event, nil)
    }
}

推荐用 StateDelta 的方式,因为它和 ADK 的事件系统集成得更好,框架会自动处理 State 的更新和持久化。

自定义 Agent 读取 State

func myCustomRun(ctx agent.InvocationContext) iter.Seq2[*session.Event, error] {
    return func(yield func(*session.Event, error) bool) {
        // 读取前面 Agent 写入的 state 值
        val, err := ctx.Session().State().Get("generated_code")
        if err != nil {
            yield(nil, fmt.Errorf("读取 state 失败: %w", err))
            return
        }

        code, ok := val.(string)
        if !ok {
            yield(nil, fmt.Errorf("state 值类型错误"))
            return
        }

        // 用读到的数据做你的自定义逻辑
        fmt.Printf("拿到了代码: %s\n", code)

        // ... 继续处理
    }
}

小结

这一章我们学了三种工作流 Agent,来回顾一下核心要点:

三种工作流

  • SequentialAgent:按顺序执行子 Agent,适合有依赖关系的流水线
  • ParallelAgent:并行执行子 Agent,适合互不依赖的并行任务
  • LoopAgent:循环执行子 Agent,适合迭代改进的场景

数据传递机制

  • OutputKey:让 LLMAgent 的输出自动存到 Session State
  • {key_name}:在 Instruction 里引用 State 中的值
  • {key_name?}:可选引用,找不到值不报错

State 前缀

  • 无前缀:普通会话级数据
  • temp::临时数据,仅当前调用有效
  • app::应用级共享,跨用户跨会话
  • user::用户级共享,同一用户的所有会话

组合嵌套

  • 工作流可以互相嵌套,组合出复杂的执行流程
  • 自定义 Agent 可以无缝接入任何工作流

底层关系

  • SequentialAgent 底层就是 MaxIterations: 1 的 LoopAgent
  • 三种工作流的配置结构高度一致,学习成本很低

动手练习:博客文章生产线

来实操一下!创建一个"博客文章生产线",实现以下流程:

Sequential(博客生产流水线)
  |-- ResearchAgent:研究主题,整理要点
  |-- WriterAgent:根据研究结果写初稿
  |-- Loop(审查改进循环,2 次迭代)
  |     |-- ReviewerAgent:审查文章质量
  |     +-- EditorAgent:根据审查意见修改
  +-- PolisherAgent:最终润色,输出成品

骨架代码

下面是一个骨架,把 // TODO 的部分补全就行:

package main

import (
    "context"
    "fmt"

    "google.golang.org/genai"

    "google.golang.org/adk/agent"
    "google.golang.org/adk/agent/llmagent"
    "google.golang.org/adk/agent/workflowagents/loopagent"
    "google.golang.org/adk/agent/workflowagents/sequentialagent"
    "google.golang.org/adk/runner"
    "google.golang.org/adk/session"
)

func main() {
    ctx := context.Background()

    // TODO: 创建你的 model
    // model := ...

    // 第一步:研究员
    researcher, _ := llmagent.New(llmagent.Config{
        Name:  "Researcher",
        Model: model,
        Instruction: `你是一个主题研究员。根据用户给出的博客主题,
研究并整理出 5-8 个关键要点,包括:
- 核心概念解释
- 实际应用案例
- 常见误区
输出结构化的研究笔记。`,
        OutputKey: "research_notes",
    })

    // 第二步:写手
    // TODO: 创建 WriterAgent
    // - 读取 {research_notes}
    // - 写初稿
    // - OutputKey: "current_draft"

    // 第三步的子 Agent:审查员
    // TODO: 创建 ReviewerAgent
    // - 读取 {current_draft}
    // - 审查并给出意见
    // - OutputKey: "temp:review_feedback"(临时数据)

    // 第三步的子 Agent:编辑
    // TODO: 创建 EditorAgent
    // - 读取 {current_draft} 和 {temp:review_feedback}
    // - 输出修改后的版本
    // - OutputKey: "current_draft"(覆盖原来的草稿)

    // 第三步:审查-修改循环
    // TODO: 用 loopagent.New 创建循环,MaxIterations: 2
    // 子 Agent:ReviewerAgent 和 EditorAgent

    // 第四步:润色师
    // TODO: 创建 PolisherAgent
    // - 读取 {current_draft}
    // - 最终润色
    // - OutputKey: "final_article"

    // TODO: 用 sequentialagent.New 把所有步骤串起来

    // 运行流水线
    sessionService := session.InMemoryService()
    r, _ := runner.New(runner.Config{
        AppName:        "blog_pipeline",
        Agent:          pipeline,  // 你创建的流水线
        SessionService: sessionService,
    })

    createResp, _ := sessionService.Create(ctx, &session.CreateRequest{
        AppName: "blog_pipeline",
        UserID:  "blogger",
    })

    userMsg := genai.NewContentFromText(
        "写一篇关于 Go 语言 Channel 的博客文章",
        genai.RoleUser,
    )

    for event, err := range r.Run(ctx, "blogger", createResp.Session.ID(), userMsg, agent.RunConfig{}) {
        if err != nil {
            fmt.Printf("错误: %v\n", err)
            continue
        }
        if event != nil && event.Content != nil {
            for _, part := range event.Content.Parts {
                if part.Text != "" {
                    fmt.Printf("\n--- [%s] ---\n%s\n", event.Author, part.Text)
                }
            }
        }
    }
}

提示

  1. Writer 的 Instruction 里要用 {research_notes} 引用研究笔记
  2. Editor 的 OutputKey 要设成 "current_draft",这样每次循环都会更新草稿
  3. Reviewer 的 OutputKey"temp:review_feedback",因为审查意见是中间产物
  4. 注意 Agent 的名字不能重复,在整个 Agent 树里必须唯一

进阶挑战

如果觉得太简单,试试这些:

  1. 在循环中加入一个自定义 Agent,用来检查文章字数,如果超过 2000 字就设置 Escalate 提前退出循环
  2. 把润色步骤改成 ParallelAgent,同时做"语法检查"和"风格优化",然后再加一个汇总 Agent
  3. 给整个流水线加上 BeforeAgentCallbacks,在执行前打印一条日志

祝你写得愉快。下一章我们会学习更高级的 Agent 协作模式 -- Agent 之间的动态转移。