系列教程 · 第二篇

LangGraph4j 完全指南
用图驱动的方式构建 AI Agent

当单一的 LLM 调用已经不够用——你需要多步推理、条件分支、循环重试、多 Agent 协作时,LangGraph4j 就是你的答案。它让你用「画流程图」的方式来编排 AI 应用,每个节点是一个动作,每条边是一个决策。

← 第一篇:LangChain4j 完全指南

一、开篇概览

LangGraph4j 是一个 Java 库,用于构建有状态的、多 Agent 的 AI 应用。它的核心思想是:用有向图(Graph)来编排 AI 工作流——节点(Node)执行动作,边(Edge)控制流转,状态(State)在节点间共享和传递。

💡 一句话总结
LangGraph4j = 状态图编排引擎 + LLM 集成,让你像画流程图一样构建复杂的 AI Agent。与 LangChain4j 搭配使用效果最佳。

为什么需要「图」?

第一篇教程中,我们学了 LangChain4j 的 AI Services——它非常适合"问一个问题,得到一个回答"的场景。但现实中的 AI 应用往往更复杂:

简单场景(LangChain4j 就够了)

  • 单轮问答
  • 文本翻译 / 摘要
  • 简单的 Tool 调用
  • 线性的 RAG 流程

复杂场景(需要 LangGraph4j)

  • AI 需要"想一想、做一做、再想想"(循环)
  • 多个 Agent 互相协作 / 交接
  • 需要人类审批才能继续的工作流
  • 出错时自动重试或走备选路径
  • 长时间运行的任务(需要持久化状态)
🤝 LangGraph4j 与 LangChain4j 的关系
LangGraph4j 不是 LangChain4j 的替代品,而是它的搭档。你用 LangChain4j 来调用 LLM、管理 Prompt、执行 Tool;用 LangGraph4j 来编排这些操作的执行顺序和决策逻辑。它们也可以和 Spring AI 一起使用。

学完本教程,你将掌握

StateGraph 状态图定义 AgentState 状态管理 Node 节点与 Edge 边 条件路由与分支 Channel / Reducer 机制 Checkpoint 持久化 ReACT Agent 构建 Human-in-the-Loop 子图与并行执行 流式与异步处理 Studio 可视化调试

✅ 本节小结

  • LangGraph4j 用图来编排复杂的 AI 工作流
  • 支持循环、条件分支、多 Agent——这些是线性调用做不到的
  • 与 LangChain4j / Spring AI 无缝集成

二、环境搭建

⚙️ 环境要求
JDK 17+(LangGraph4j 从 1.2.x 开始要求 JDK 17)
Maven 3.8+Gradle 7+
建议先完成第一篇教程的 LangChain4j 环境搭建
Maven 依赖
pom.xml
<properties>
    <langgraph4j.version>1.8.5</langgraph4j.version>
</properties>

<!-- 使用 BOM 管理版本 -->
<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.bsc.langgraph4j</groupId>
            <artifactId>langgraph4j-bom</artifactId>
            <version>${langgraph4j.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <!-- 核心库 -->
    <dependency>
        <groupId>org.bsc.langgraph4j</groupId>
        <artifactId>langgraph4j-core</artifactId>
    </dependency>

    <!-- LangChain4j 集成(可选,用于连接 LLM) -->
    <dependency>
        <groupId>org.bsc.langgraph4j</groupId>
        <artifactId>langgraph4j-langchain4j</artifactId>
    </dependency>

    <!-- Agent Executor(可选,内置 ReACT Agent) -->
    <dependency>
        <groupId>org.bsc.langgraph4j</groupId>
        <artifactId>langgraph4j-agent-executor</artifactId>
    </dependency>
</dependencies>
Gradle 依赖
dependencies {
    implementation platform('org.bsc.langgraph4j:langgraph4j-bom:1.8.5')
    implementation 'org.bsc.langgraph4j:langgraph4j-core'
    implementation 'org.bsc.langgraph4j:langgraph4j-langchain4j'
}

✅ 本节小结

  • 核心依赖是 langgraph4j-core
  • 连接 LLM 需要 langgraph4j-langchain4jlanggraph4j-spring-ai
  • Maven groupId 是 org.bsc.langgraph4j

三、核心概念

LangGraph4j 的世界观很简单:一切皆图。让我们用一个生活化的类比来理解。

🏭 总体类比:LangGraph4j 就像一条「智能流水线」
想象一家工厂的流水线。AgentState 是流水线上传递的「工单」(记录着所有信息),Node 是每个工位的工人(执行具体操作),Edge 是传送带(决定工单流向哪个工位),条件边 是质检员(根据工单状态决定走哪条路),Checkpoint 是存档点(随时可以回溯)。

AgentState — 工单

AgentState 是在整个图中流转的共享状态。本质上它是一个 Map<String, Object>,每个节点都可以读取和更新它。状态的结构通过Schema(一组 Channel)来定义。

import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.state.Channel;
import org.bsc.langgraph4j.state.Channels;

class MyState extends AgentState {
    // 定义状态的结构(Schema)
    public static final Map<String, Channel<?>> SCHEMA = Map.of(
        // "messages" 字段:追加模式,新消息会加入列表
        "messages", Channels.appender(ArrayList::new),
        // "currentStep" 字段:覆盖模式,新值直接替换旧值
        "currentStep", Channels.base(() -> "init")
    );

    public MyState(Map<String, Object> initData) {
        super(initData);
    }

    // 类型安全的访问方法
    public List<String> messages() {
        return this.<List<String>>value("messages").orElse(List.of());
    }

    public String currentStep() {
        return this.<String>value("currentStep").orElse("init");
    }
}

Node — 工位上的工人

节点是图中执行具体操作的地方。每个节点接收当前状态作为输入,返回一个 Map<String, Object> 作为状态更新。

import org.bsc.langgraph4j.action.NodeAction;

// 方式 1:实现 NodeAction 接口
class GreeterNode implements NodeAction<MyState> {
    @Override
    public Map<String, Object> apply(MyState state) {
        return Map.of("messages", "你好!欢迎来到 LangGraph4j");
    }
}

// 方式 2:用 Lambda 表达式(更简洁)
NodeAction<MyState> greeter = state ->
    Map.of("messages", "你好!欢迎来到 LangGraph4j");

Edge — 传送带与质检员

LangGraph4j 有三种边:

边类型类比作用
普通边 addEdge(A, B)固定传送带A 完成后无条件去 B
条件边 addConditionalEdges(...)质检员分拣根据状态动态决定下一个节点
条件入口 addConditionalEntryPoint(...)分类入口根据初始状态决定从哪个节点开始

编译与执行

定义好图之后,需要编译(compile)成一个不可变的 CompiledGraph 才能执行。编译过程会校验图结构(比如检查孤立节点)。

定义 StateGraph 添加 Node & Edge .compile() .stream() 或 .invoke()

✅ 本节小结

  • AgentState:图中流转的共享状态(Map 封装)
  • Node:执行操作并返回状态更新的函数
  • Edge:控制节点间的流转方向
  • compile():将图定义编译为可执行的 CompiledGraph

四、第一个图:Hello Graph

场景:构建一个最简单的两节点图——greeter 打招呼,responder 回应,感受图的基本运作。

import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.state.*;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;

import java.util.*;

// 1. 定义状态
class SimpleState extends AgentState {
    static final Map<String, Channel<?>> SCHEMA = Map.of(
        "messages", Channels.appender(ArrayList::new)
    );

    public SimpleState(Map<String, Object> initData) { super(initData); }

    public List<String> messages() {
        return this.<List<String>>value("messages").orElse(List.of());
    }
}

// 2. 定义并编译图
var graph = new StateGraph<>(SimpleState.SCHEMA, SimpleState::new)
    .addNode("greeter", node_async(state -> {
        System.out.println("Greeter 收到: " + state.messages());
        return Map.of("messages", "你好,朋友!");
    }))
    .addNode("responder", node_async(state -> {
        System.out.println("Responder 收到: " + state.messages());
        return Map.of("messages", "很高兴见到你!");
    }))
    .addEdge(START, "greeter")       // 起点 → greeter
    .addEdge("greeter", "responder") // greeter → responder
    .addEdge("responder", END)       // responder → 终点
    .compile();

// 3. 执行图
for (var step : graph.stream(Map.of("messages", "开始对话"))) {
    System.out.println("步骤: " + step.node());
    System.out.println("状态: " + step.state().messages());
    System.out.println("---");
}

输出效果:

Greeter 收到: [开始对话]
步骤: greeter
状态: [开始对话, 你好,朋友!]
---
Responder 收到: [开始对话, 你好,朋友!]
步骤: responder
状态: [开始对话, 你好,朋友!, 很高兴见到你!]
---
💡 关键观察
注意 messages追加的(因为用了 Channels.appender())。每个节点返回的消息被自动追加到列表中,而不是覆盖。这就是 Channel/Reducer 的作用。

✅ 本节小结

  • new StateGraph<>(schema, constructor) 创建图
  • addNode() 添加节点,addEdge() 连接节点
  • STARTEND 是内置的起止标记
  • stream() 返回每一步的执行结果

五、条件边与路由

场景:AI 根据用户输入判断意图——是闲聊还是查询订单?不同意图走不同的处理路径。这就是条件边的用武之地。

// 定义状态
class RouterState extends AgentState {
    static final Map<String, Channel<?>> SCHEMA = Map.of(
        "input", Channels.base(() -> ""),
        "intent", Channels.base(() -> ""),
        "output", Channels.base(() -> "")
    );

    public RouterState(Map<String, Object> d) { super(d); }
    public String input() { return this.<String>value("input").orElse(""); }
    public String intent() { return this.<String>value("intent").orElse(""); }
}

var graph = new StateGraph<>(RouterState.SCHEMA, RouterState::new)

    // 意图识别节点
    .addNode("classifier", node_async(state -> {
        String input = state.input().toLowerCase();
        String intent = input.contains("订单") ? "order" : "chat";
        return Map.of("intent", intent);
    }))

    // 闲聊处理
    .addNode("chatHandler", node_async(state ->
        Map.of("output", "这是一个闲聊回复:" + state.input())
    ))

    // 订单处理
    .addNode("orderHandler", node_async(state ->
        Map.of("output", "正在查询您的订单信息...")
    ))

    // 边:起点 → 分类器
    .addEdge(START, "classifier")

    // 条件边:根据 intent 路由
    .addConditionalEdges("classifier",
        state -> state.intent(),  // 路由函数:返回下一个节点名
        Map.of(
            "chat", "chatHandler",   // intent=="chat" → chatHandler
            "order", "orderHandler"  // intent=="order" → orderHandler
        )
    )

    // 两个处理器都通向终点
    .addEdge("chatHandler", END)
    .addEdge("orderHandler", END)
    .compile();

// 测试
for (var step : graph.stream(Map.of("input", "帮我查一下订单状态"))) {
    System.out.println(step.node() + " → " + step.state());
}
⚠️ 常见坑
路由函数返回值必须匹配addConditionalEdges 的第三个参数是一个 Map,key 是路由函数可能返回的值,value 是对应的节点名。如果路由函数返回了 Map 里没有的值,会抛出异常。
别忘了终止边:条件边只定义了"从 A 到哪",你仍然需要为后续节点添加到 END 的边,否则图不完整。

✅ 本节小结

  • addConditionalEdges(source, routingFn, routeMap) 实现动态路由
  • 路由函数接收当前状态,返回一个字符串标识下一个节点
  • 这是构建"会思考"的 AI 应用的关键能力

六、Channel 与状态管理

场景:不同的状态字段需要不同的更新策略——有的要追加(如消息列表),有的要覆盖(如当前步骤),有的需要自定义合并逻辑。

LangGraph4j 通过 Channel 来定义每个状态字段的更新策略(Reducer):

Channel 类型行为适用场景
Channels.base(() -> default)新值覆盖旧值当前步骤、最新结果等
Channels.appender(ArrayList::new)新值追加到列表消息历史、日志等
// 混合使用不同的 Channel 策略
class WorkflowState extends AgentState {
    static final Map<String, Channel<?>> SCHEMA = Map.of(
        // 覆盖模式:只保留最新值
        "status", Channels.base(() -> "pending"),
        "result", Channels.base(() -> ""),

        // 追加模式:所有值都保留
        "logs", Channels.appender(ArrayList::new),
        "errors", Channels.appender(ArrayList::new)
    );

    public WorkflowState(Map<String, Object> d) { super(d); }

    public String status() {
        return this.<String>value("status").orElse("pending");
    }
    public List<String> logs() {
        return this.<List<String>>value("logs").orElse(List.of());
    }
}

// 节点更新状态
var processNode = node_async((WorkflowState state) -> Map.of(
    "status", "processing",                  // 覆盖 → "processing"
    "logs", "开始处理任务",                     // 追加到 logs 列表
    "logs", "读取输入数据完成"                   // 再追加一条
));
💡 为什么 Channel 很重要?
在多节点并发执行时,多个节点可能同时更新同一个状态字段。Channel 的 Reducer 机制确保更新被正确合并,而不是互相覆盖。这是 LangGraph4j 支持并行执行的基础。

✅ 本节小结

  • Channels.base():覆盖模式,保留最新值
  • Channels.appender():追加模式,累积所有值
  • Channel 机制是并行执行和正确状态合并的基础

七、Checkpoint 持久化

场景:你的 AI Agent 正在处理一个长时间任务,突然服务重启了——之前的所有进度都丢了。Checkpoint 就像游戏存档,让你随时可以「读档继续」。

import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.CompileConfig;
import org.bsc.langgraph4j.RunnableConfig;

// 1. 创建 CheckpointSaver(内存版,生产环境可用数据库实现)
var saver = new MemorySaver();

// 2. 编译时注入 Saver
var compiledGraph = graph.compile(
    CompileConfig.builder()
        .checkpointSaver(saver)
        .build()
);

// 3. 使用 threadId 隔离不同的执行上下文
var config = RunnableConfig.builder()
        .threadId("session-001")
        .build();

// 4. 执行图(每一步自动保存检查点)
for (var step : compiledGraph.stream(initialState, config)) {
    System.out.println("节点: " + step.node());
}

// 5. 下次可以从最后的检查点恢复执行
// (使用相同的 threadId)
查看和回溯历史状态(Time Travel)

Checkpoint 不仅能恢复,还能「时间旅行」——查看图在每一步的完整状态:

// 使用 streamSnapshots 获取每一步的快照
for (var snapshot : compiledGraph.streamSnapshots(initialState, config)) {
    System.out.println("节点: " + snapshot.node());
    System.out.println("状态: " + snapshot.state());
    System.out.println("配置: " + snapshot.config());
    System.out.println("---");
}

✅ 本节小结

  • MemorySaver 提供内存级的检查点存储
  • threadId 隔离不同的执行上下文
  • 支持查看历史状态快照(Time Travel)

八、ReACT Agent 实战

场景:构建一个能"思考→行动→观察→再思考"的 AI Agent。这就是著名的 ReACT(Reasoning + Acting)模式——LLM 决定调用哪个工具,观察结果,再决定下一步。

🔁 ReACT 循环
ReACT 是一个典型的循环图:
LLM 思考 → 需要工具?→ 是 → 执行工具 → 把结果反馈给 LLM → LLM 再思考 → 还需要工具?→ 否 → 输出最终回答

这正是 LangGraph4j 擅长的——循环图,线性流程做不到。

LangGraph4j 内置了 AgentExecutor,直接帮你搭建好了 ReACT 图:

import org.bsc.langgraph4j.agentexecutor.AgentExecutor;
import dev.langchain4j.model.openai.OpenAiChatModel;
import dev.langchain4j.agent.tool.Tool;
import dev.langchain4j.agent.tool.P;

// 1. 定义工具
public class MyTools {

    @Tool("查询指定城市的天气")
    String getWeather(@P("城市名") String city) {
        return city + ": 晴天,26°C";
    }

    @Tool("计算数学表达式")
    double calculate(@P("数学表达式") String expression) {
        // 简化实现
        return switch (expression) {
            case "26 * 9 / 5 + 32" -> 78.8;
            default -> 0;
        };
    }
}

// 2. 创建 LLM
var chatModel = OpenAiChatModel.builder()
    .apiKey(System.getenv("OPENAI_API_KEY"))
    .modelName("gpt-4o-mini")
    .temperature(0.0)
    .build();

// 3. 构建 AgentExecutor(内置 ReACT 图)
var agentGraph = AgentExecutor.builder()
    .chatModel(chatModel)
    .toolsFromObject(new MyTools())
    .build()
    .compile();

// 4. 运行
for (var step : agentGraph.stream(
        Map.of("messages", "北京天气怎么样?把温度转成华氏度"))) {
    System.out.println(step);
}
// AI 会:1) 调用 getWeather("北京") 2) 调用 calculate("26*9/5+32") 3) 组织最终回答
手动构建 ReACT 图(深入理解原理)

如果你想完全掌控 ReACT 的行为,可以手动构建循环图:

var graph = new StateGraph<>(AgentState.SCHEMA, AgentState::new)
    // Agent 节点:调用 LLM 决定下一步
    .addNode("agent", node_async(state -> {
        // 调用 LLM,LLM 可能返回文本或工具调用
        var response = chatModel.chat(buildMessages(state));
        return Map.of("messages", response.aiMessage());
    }))

    // 工具执行节点
    .addNode("tools", node_async(state -> {
        // 执行 LLM 要求的工具调用
        var toolResults = executeTools(state);
        return Map.of("messages", toolResults);
    }))

    // 起点 → Agent
    .addEdge(START, "agent")

    // 条件边:Agent 之后,检查是否需要调用工具
    .addConditionalEdges("agent",
        state -> {
            var lastMessage = getLastMessage(state);
            return lastMessage.hasToolCalls() ? "continue" : "end";
        },
        Map.of(
            "continue", "tools",  // 需要工具 → 去执行
            "end", END            // 不需要 → 结束
        )
    )

    // 工具执行完 → 回到 Agent(形成循环!)
    .addEdge("tools", "agent")
    .compile();

注意最后一行 .addEdge("tools", "agent")——这形成了一个循环,是 ReACT 模式的核心。Agent 不断"思考→行动→观察",直到认为可以给出最终回答。

✅ 本节小结

  • AgentExecutor 是内置的 ReACT Agent 实现
  • ReACT 模式的核心是 Agent→Tool 的循环图
  • 条件边决定"继续调用工具"还是"结束并输出"

九、Human-in-the-Loop

场景:AI 准备执行一个危险操作(如删除数据、发送邮件),你希望先暂停,让人类审批后再继续。

LangGraph4j 通过 interrupt 机制实现人工介入。当图执行到某个节点时可以暂停,等待外部输入后继续。

// 创建需要人工审批的图
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new)
    .addNode("prepare", node_async(state -> {
        return Map.of(
            "action", "delete_user_data",
            "status", "pending_approval"
        );
    }))

    .addNode("execute", node_async(state -> {
        // 只有审批通过才会到达这里
        return Map.of("status", "completed", "logs", "操作已执行");
    }))

    .addEdge(START, "prepare")

    // 条件边:检查审批状态
    .addConditionalEdges("prepare",
        state -> state.<String>value("approval").orElse("waiting"),
        Map.of(
            "approved", "execute",
            "rejected", END,
            "waiting", END  // 暂停等待
        )
    )
    .addEdge("execute", END)
    .compile(CompileConfig.builder()
        .checkpointSaver(new MemorySaver())
        .build());

// 第一次执行:到 prepare 后暂停
var config = RunnableConfig.builder().threadId("task-001").build();
graph.stream(Map.of("input", "删除用户数据"), config);

// 人类审批后,用更新状态恢复执行
graph.stream(Map.of("approval", "approved"), config);
💡 典型应用场景
敏感操作审批:删除数据、发送通知等需要人工确认。
信息补充:AI 需要用户提供额外信息才能继续。
质量把关:AI 生成内容后,由人工审核再发布。

✅ 本节小结

  • 通过条件边 + Checkpoint 实现图的暂停与恢复
  • 使用相同 threadId 可以从上次暂停的地方继续
  • Human-in-the-Loop 是生产级 AI 应用的安全保障

十、子图与并行执行

子图(Subgraph)

场景:你的图太复杂了,想把一部分逻辑封装成独立的"子图",就像函数调用一样复用。

// 1. 定义子图(独立的小图)
var subGraph = new StateGraph<>(MyState.SCHEMA, MyState::new)
    .addNode("step1", node_async(state ->
        Map.of("logs", "子图 Step1 完成")))
    .addNode("step2", node_async(state ->
        Map.of("logs", "子图 Step2 完成")))
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", END);

// 2. 在父图中把子图作为节点使用
var parentGraph = new StateGraph<>(MyState.SCHEMA, MyState::new)
    .addNode("init", node_async(state ->
        Map.of("logs", "父图初始化")))
    .addSubgraph("processing", subGraph)  // 子图作为节点
    .addNode("finish", node_async(state ->
        Map.of("logs", "父图完成")))
    .addEdge(START, "init")
    .addEdge("init", "processing")
    .addEdge("processing", "finish")
    .addEdge("finish", END)
    .compile();

并行执行

场景:你需要同时查询天气和新闻,没必要串行等待。LangGraph4j 支持将多个节点配置为并行执行。

// 从 "start" 出发,同时进入 "weather" 和 "news"
var graph = new StateGraph<>(MyState.SCHEMA, MyState::new)
    .addNode("weather", node_async(state -> {
        Thread.sleep(1000); // 模拟网络请求
        return Map.of("logs", "天气:晴天 25°C");
    }))
    .addNode("news", node_async(state -> {
        Thread.sleep(1000); // 模拟网络请求
        return Map.of("logs", "新闻:Java 24 发布");
    }))
    .addNode("summarize", node_async(state ->
        Map.of("logs", "汇总完成: " + state.logs())))

    .addEdge(START, "weather")    // START 同时连接两个节点
    .addEdge(START, "news")       // = 并行分支
    .addEdge("weather", "summarize")
    .addEdge("news", "summarize") // 两个分支汇合
    .addEdge("summarize", END)
    .compile();

// weather 和 news 会并行执行,都完成后再进入 summarize

✅ 本节小结

  • 子图通过 addSubgraph() 嵌入父图,实现逻辑复用
  • 同一个节点连接多个后续节点即可实现并行执行
  • 并行分支在汇合节点自动等待所有分支完成

十一、流式与异步处理

场景:AI 的回复很长,你想一边生成一边展示,而不是干等。LangGraph4j 原生支持异步和流式。

异步节点

import java.util.concurrent.CompletableFuture;
import org.bsc.langgraph4j.action.AsyncNodeAction;

// 所有节点默认包装为异步(node_async)
// 也可以直接返回 CompletableFuture
AsyncNodeAction<MyState> asyncNode = state ->
    CompletableFuture.supplyAsync(() -> {
        // 在单独线程执行耗时操作
        var result = callExternalAPI(state);
        return Map.of("result", result);
    });

流式执行

// stream() 逐步返回每个节点执行后的状态
for (var nodeOutput : compiledGraph.stream(initialState)) {
    System.out.println("节点 " + nodeOutput.node() + " 完成");
    System.out.println("当前状态: " + nodeOutput.state());
}

// invoke() 等待整个图执行完,返回最终状态
var finalState = compiledGraph.invoke(initialState).get();
System.out.println("最终结果: " + finalState);
方法行为适用场景
stream()逐步返回每个节点的输出需要实时展示进度的 UI
invoke()返回最终状态的 CompletableFuture后台任务,只关心最终结果
streamSnapshots()返回包含完整配置信息的快照调试和日志

✅ 本节小结

  • 节点天然支持异步(CompletableFuture)
  • stream() 逐步输出,invoke() 一次性输出
  • 流式执行对构建实时 UI 至关重要

十二、Studio 可视化调试

场景:你的图越来越复杂,光看代码很难理解执行流程。LangGraph4j 内置了一个 Web UI(Studio),让你可视化地查看、运行和调试图。

import org.bsc.langgraph4j.studio.LangGraphStudioServer;
import org.bsc.langgraph4j.studio.LangGraphStudioServer4Jetty;

// 1. 添加 Studio 依赖
// <artifactId>langgraph4j-studio-jetty</artifactId>

// 2. 创建 Studio 实例
var saver = new MemorySaver();
var instance = LangGraphStudioServer.Instance.builder()
        .title("我的 AI Agent")
        .addInputStringArg("input")   // 定义输入字段
        .graph(stateGraph)             // 传入 StateGraph
        .compileConfig(CompileConfig.builder()
                .checkpointSaver(saver)
                .build())
        .build();

// 3. 启动 Studio 服务器
LangGraphStudioServer4Jetty.builder()
        .port(8080)
        .instance("default", instance)
        .build()
        .start()
        .join();

// 4. 打开浏览器访问 http://localhost:8080
🤩 Studio 能做什么?
图结构可视化:直观看到节点和边的连接关系。
实时执行:输入数据后在 UI 上实时观察图的执行过程。
状态检查:查看每一步的完整状态快照。
调试利器:比看日志效率高 10 倍。
生成 Mermaid / PlantUML 图

如果你只需要静态的图结构,可以导出为 Mermaid 或 PlantUML 格式:

// 编译后获取图的 Mermaid 描述
var compiled = stateGraph.compile();
String mermaid = compiled.getGraph().toMermaid();
System.out.println(mermaid);

// 输出类似:
// graph TD
//   __start__ --> classifier
//   classifier -->|chat| chatHandler
//   classifier -->|order| orderHandler
//   chatHandler --> __end__
//   orderHandler --> __end__

✅ 本节小结

  • Studio 提供 Web UI 可视化调试你的图
  • 支持 Jetty 和 Spring Boot 两种集成方式
  • 可导出 Mermaid / PlantUML 格式的图描述

十三、综合实战:智能研究助手

让我们构建一个「智能研究助手」——用户提出一个研究问题,Agent 会自动搜索信息、评估质量、不够好就继续搜索(循环),最后生成研究报告。这个项目串联了前面所有知识点。

START planner researcher 评估质量? writer END

(评估不通过时,从 researcher 循环重试)

1

定义状态

class ResearchState extends AgentState {
    static final Map<String, Channel<?>> SCHEMA = Map.of(
        "question", Channels.base(() -> ""),
        "plan", Channels.base(() -> ""),
        "findings", Channels.appender(ArrayList::new),
        "quality", Channels.base(() -> "low"),
        "retryCount", Channels.base(() -> 0),
        "report", Channels.base(() -> "")
    );

    public ResearchState(Map<String, Object> d) { super(d); }

    public String question() { return this.<String>value("question").orElse(""); }
    public String plan() { return this.<String>value("plan").orElse(""); }
    public List<String> findings() {
        return this.<List<String>>value("findings").orElse(List.of());
    }
    public String quality() { return this.<String>value("quality").orElse("low"); }
    public int retryCount() { return this.<Integer>value("retryCount").orElse(0); }
}
2

定义节点

// 规划节点:LLM 制定研究计划
var planner = node_async((ResearchState state) -> {
    String plan = chatModel.chat(
        "为以下研究问题制定搜索计划:" + state.question());
    return Map.of("plan", plan);
});

// 研究节点:执行搜索(模拟)
var researcher = node_async((ResearchState state) -> {
    String finding = chatModel.chat(
        "根据计划搜索信息:" + state.plan()
        + "\n已有发现:" + state.findings());
    return Map.of(
        "findings", finding,
        "retryCount", state.retryCount() + 1
    );
});

// 评估节点:判断信息是否充分
var evaluator = node_async((ResearchState state) -> {
    String quality = chatModel.chat(
        "评估以下研究发现的质量(回答 high 或 low):\n"
        + String.join("\n", state.findings()));
    return Map.of("quality", quality.trim().toLowerCase());
});

// 写作节点:生成研究报告
var writer = node_async((ResearchState state) -> {
    String report = chatModel.chat(
        "基于以下发现撰写研究报告:\n"
        + String.join("\n", state.findings()));
    return Map.of("report", report);
});
3

组装图

var researchGraph = new StateGraph<>(ResearchState.SCHEMA, ResearchState::new)
    .addNode("planner", planner)
    .addNode("researcher", researcher)
    .addNode("evaluator", evaluator)
    .addNode("writer", writer)

    // 流程编排
    .addEdge(START, "planner")
    .addEdge("planner", "researcher")
    .addEdge("researcher", "evaluator")

    // 关键:条件边实现循环
    .addConditionalEdges("evaluator",
        state -> {
            if (state.quality().contains("high") || state.retryCount() >= 3) {
                return "write";    // 质量够好或已重试 3 次 → 写报告
            }
            return "retry";        // 质量不够 → 继续研究
        },
        Map.of(
            "write", "writer",
            "retry", "researcher"  // 循环回 researcher!
        )
    )
    .addEdge("writer", END)
    .compile(CompileConfig.builder()
        .checkpointSaver(new MemorySaver())
        .build());
4

运行

var config = RunnableConfig.builder().threadId("research-001").build();

for (var step : researchGraph.stream(
        Map.of("question", "Java 虚拟线程在高并发场景下的性能表现如何?"),
        config)) {
    System.out.println("【" + step.node() + "】");
    var state = (ResearchState) step.state();
    if (!state.findings().isEmpty()) {
        System.out.println("  发现数量: " + state.findings().size());
    }
    if (step.node().equals("writer")) {
        System.out.println("  报告: " + state.<String>value("report").orElse(""));
    }
}
🎉 项目亮点回顾
这个项目用到了:AgentState + Channel(混合覆盖和追加模式)、条件边(质量评估路由)、循环图(researcher ↔ evaluator)、Checkpoint(可恢复执行)、LangChain4j 集成(调用 LLM)。

✅ 本节小结

  • 综合运用了状态管理、条件路由、循环图、持久化
  • 循环 + 条件边实现了"质量不够就重试"的 AI 自主决策
  • retryCount 防止无限循环,这是生产必备的安全措施

十四、速查表 / Cheat Sheet

图定义

API用途示例
new StateGraph<>(schema, ctor)创建状态图new StateGraph<>(MyState.SCHEMA, MyState::new)
.addNode(name, action)添加节点.addNode("agent", node_async(...))
.addEdge(from, to)添加普通边.addEdge("a", "b")
.addConditionalEdges(...)添加条件边.addConditionalEdges("a", fn, routeMap)
.addSubgraph(name, graph)嵌入子图.addSubgraph("sub", childGraph)
.compile()编译为可执行图.compile(CompileConfig.builder()...)
START / END内置起止标记addEdge(START, "first")

状态与 Channel

API用途示例
Channels.base(() -> default)覆盖模式 ChannelChannels.base(() -> "")
Channels.appender(factory)追加模式 ChannelChannels.appender(ArrayList::new)
state.value("key")读取状态值state.<String>value("name").orElse("")

执行与调试

API用途示例
graph.stream(state)流式执行for (var s : graph.stream(init)) {...}
graph.invoke(state)执行并返回最终状态graph.invoke(init).get()
graph.streamSnapshots(...)流式快照(带配置信息)用于调试和 Time Travel
MemorySaver内存检查点存储new MemorySaver()
RunnableConfig运行配置(threadId 等)RunnableConfig.builder().threadId("t1").build()

集成

模块用途
langgraph4j-langchain4j与 LangChain4j 集成
langgraph4j-spring-ai与 Spring AI 集成
langgraph4j-agent-executor内置 ReACT Agent
langgraph4j-studio-jettyStudio Web UI(Jetty)
langgraph4j-studio-springbootStudio Web UI(Spring Boot)

十五、进阶路线图

高级用法探索

Multi-Agent Supervisor:一个 "主管" Agent 协调多个专业 Agent 工作

Agent Handoff:Agent 之间无缝交接任务和上下文

Adaptive RAG:根据查询复杂度动态调整检索策略

Deep Agents (Agent 2.0):深度嵌套的 Agent 架构

AG-UI 协议:标准化的 Agent 与前端交互协议

Postgres Checkpoint:生产级的持久化检查点存储

Graph Builder:可视化拖拽设计图,自动生成 Java 代码

推荐资源

资源链接说明
官方文档langgraph4j.github.io最权威的参考
GitHub 仓库github.com/langgraph4j源码与示例
示例仓库langgraph4j-examples完整可运行的示例
Graph Builder可视化图设计器拖拽设计图并生成代码
O'Reilly 书籍Applied AI for Enterprise Java深度学习 LangGraph4j 章节
🚀 下一步建议
试试用 Graph Builder 可视化设计你的第一个 Agent 图,它会自动生成 Java 代码骨架。然后在这个基础上填充业务逻辑——这是最快的上手方式。

🎓 系列教程第二篇完成!

你已经掌握了用图来编排复杂 AI 应用的能力。
结合第一篇 LangChain4j 的知识,你可以构建真正的生产级 AI Agent 了!