Files
ruoyi-ai/docs/工作流模块说明.md
2025-10-21 09:52:33 +08:00

13 KiB
Raw Blame History

Ruoyi-AI 工作流模块详细说明文档

概述

Ruoyi-AI 工作流模块是一个基于 LangGraph4j 的智能工作流引擎支持可视化工作流设计、AI 模型集成、条件分支、人机交互等高级功能。该模块采用微服务架构,提供完整的 RESTful API 和流式响应支持。

模块架构

1. 模块结构

ruoyi-ai/
├── ruoyi-modules/
│   └── ruoyi-workflow/           # 工作流核心模块
│       ├── pom.xml
│       └── src/main/java/org/ruoyi/workflow/
│           └── controller/       # 控制器层
│               ├── WorkflowController.java
│               ├── WorkflowRuntimeController.java
│               └── admin/        # 管理端控制器
│                   ├── AdminWorkflowController.java
│                   └── AdminWorkflowComponentController.java
└── ruoyi-modules-api/
    └── ruoyi-workflow-api/       # 工作流API模块
        ├── pom.xml
        └── src/main/java/org/ruoyi/workflow/
            ├── entity/           # 实体类
            ├── dto/              # 数据传输对象
            ├── service/          # 服务接口
            ├── mapper/           # 数据访问层
            ├── workflow/         # 工作流核心逻辑
            ├── enums/            # 枚举类
            ├── util/             # 工具类
            └── exception/        # 异常处理

2. 核心依赖

  • LangGraph4j: 1.5.3 - 工作流图执行引擎
  • LangChain4j: 1.2.0 - AI 模型集成框架
  • Spring Boot: 3.x - 应用框架
  • MyBatis Plus: 数据访问层
  • Redis: 缓存和状态管理
  • Swagger/OpenAPI: API 文档

核心功能

1. 工作流管理

1.1 工作流定义

  • 创建工作流: 支持自定义标题、描述、公开性设置
  • 编辑工作流: 可视化节点编辑、连接线配置
  • 版本控制: 支持工作流的版本管理和回滚
  • 权限管理: 支持公开/私有工作流设置

1.2 工作流执行

  • 流式执行: 基于 SSE 的实时流式响应
  • 状态管理: 完整的执行状态跟踪
  • 错误处理: 详细的错误信息和异常处理
  • 中断恢复: 支持工作流中断和恢复执行

2. 节点类型

2.1 基础节点

  • Start: 开始节点,定义工作流入口
  • End: 结束节点,定义工作流出口

2.2 AI 模型节点

  • Answer: 大语言模型问答节点
  • Dalle3: DALL-E 3 图像生成
  • Tongyiwanx: 通义万相图像生成
  • Classifier: 内容分类节点

2.3 数据处理节点

  • DocumentExtractor: 文档信息提取
  • KeywordExtractor: 关键词提取
  • FaqExtractor: 常见问题提取
  • KnowledgeRetrieval: 知识库检索

2.4 控制流节点

  • Switcher: 条件分支节点
  • HumanFeedback: 人机交互节点

2.5 外部集成节点

  • Google: Google 搜索集成
  • MailSend: 邮件发送
  • HttpRequest: HTTP 请求
  • Template: 模板转换

3. 数据流管理

3.1 输入输出定义

// 节点输入输出数据结构
public class NodeIOData {
    private String name;           // 参数名称
    private NodeIODataContent content; // 参数内容
}

// 支持的数据类型
public enum WfIODataTypeEnum {
    TEXT,        // 文本
    NUMBER,      // 数字
    BOOLEAN,     // 布尔值
    FILES,       // 文件
    OPTIONS      // 选项
}

3.2 参数引用

  • 节点间引用: 支持上游节点输出作为下游节点输入
  • 参数映射: 自动处理参数名称映射
  • 类型转换: 自动进行数据类型转换

数据库设计

1. 核心表结构

1.1 工作流定义表 (t_workflow)

CREATE TABLE t_workflow (
    id          BIGINT AUTO_INCREMENT PRIMARY KEY,
    uuid        VARCHAR(32)  NOT NULL DEFAULT '',
    title       VARCHAR(100) NOT NULL DEFAULT '',
    remark      TEXT         NOT NULL DEFAULT '',
    user_id     BIGINT       NOT NULL DEFAULT 0,
    is_public   TINYINT(1)   NOT NULL DEFAULT 0,
    is_enable   TINYINT(1)   NOT NULL DEFAULT 1,
    create_time DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    is_deleted  TINYINT(1)   NOT NULL DEFAULT 0
);

1.2 工作流节点表 (t_workflow_node)

CREATE TABLE t_workflow_node (
    id                    BIGINT AUTO_INCREMENT PRIMARY KEY,
    uuid                  VARCHAR(32)  NOT NULL DEFAULT '',
    workflow_id           BIGINT       NOT NULL DEFAULT 0,
    workflow_component_id BIGINT       NOT NULL DEFAULT 0,
    user_id               BIGINT       NOT NULL DEFAULT 0,
    title                 VARCHAR(100) NOT NULL DEFAULT '',
    remark                VARCHAR(500) NOT NULL DEFAULT '',
    input_config          JSON         NOT NULL DEFAULT ('{}'),
    node_config           JSON         NOT NULL DEFAULT ('{}'),
    position_x            DOUBLE       NOT NULL DEFAULT 0,
    position_y            DOUBLE       NOT NULL DEFAULT 0,
    create_time           DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time           DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    is_deleted            TINYINT(1)   NOT NULL DEFAULT 0
);

1.3 工作流边表 (t_workflow_edge)

CREATE TABLE t_workflow_edge (
    id               BIGINT AUTO_INCREMENT PRIMARY KEY,
    uuid             VARCHAR(32) NOT NULL DEFAULT '',
    workflow_id      BIGINT      NOT NULL DEFAULT 0,
    source_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
    source_handle    VARCHAR(32) NOT NULL DEFAULT '',
    target_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
    create_time      DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time      DATETIME    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    is_deleted       TINYINT(1)  NOT NULL DEFAULT 0
);

1.4 工作流运行时表 (t_workflow_runtime)

CREATE TABLE t_workflow_runtime (
    id            BIGINT AUTO_INCREMENT PRIMARY KEY,
    uuid          VARCHAR(32)  NOT NULL DEFAULT '',
    user_id       BIGINT       NOT NULL DEFAULT 0,
    workflow_id   BIGINT       NOT NULL DEFAULT 0,
    input         JSON         NOT NULL DEFAULT ('{}'),
    output        JSON         NOT NULL DEFAULT ('{}'),
    status        SMALLINT     NOT NULL DEFAULT 1,
    status_remark VARCHAR(250) NOT NULL DEFAULT '',
    create_time   DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP,
    update_time   DATETIME     NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    is_deleted    TINYINT(1)   NOT NULL DEFAULT 0
);

1.5 工作流组件表 (t_workflow_component)

CREATE TABLE t_workflow_component (
    id            BIGINT AUTO_INCREMENT PRIMARY KEY,
    uuid          VARCHAR(32)  DEFAULT '' NOT NULL,
    name          VARCHAR(32)  DEFAULT '' NOT NULL,
    title         VARCHAR(100) DEFAULT '' NOT NULL,
    remark        TEXT                   NOT NULL,
    display_order INT          DEFAULT 0 NOT NULL,
    is_enable     TINYINT(1)   DEFAULT 0 NOT NULL,
    create_time   DATETIME     DEFAULT CURRENT_TIMESTAMP NOT NULL,
    update_time   DATETIME     DEFAULT CURRENT_TIMESTAMP NOT NULL,
    is_deleted    TINYINT(1)   DEFAULT 0 NOT NULL
);

API 接口

1. 工作流管理接口

1.1 基础操作

# 创建工作流
POST /workflow/add
Content-Type: application/json
{
    "title": "工作流标题",
    "remark": "工作流描述",
    "isPublic": false
}

# 更新工作流
POST /workflow/update
Content-Type: application/json
{
    "uuid": "工作流UUID",
    "title": "新标题",
    "remark": "新描述"
}

# 删除工作流
POST /workflow/del/{uuid}

# 启用/禁用工作流
POST /workflow/enable/{uuid}?enable=true

1.2 搜索和查询

# 搜索我的工作流
GET /workflow/mine/search?keyword=关键词&isPublic=true&currentPage=1&pageSize=10

# 搜索公开工作流
GET /workflow/public/search?keyword=关键词&currentPage=1&pageSize=10

# 获取工作流组件列表
GET /workflow/public/component/list

2. 工作流执行接口

2.1 流式执行

# 流式执行工作流
POST /workflow/run
Content-Type: application/json
Accept: text/event-stream
{
    "uuid": "工作流UUID",
    "inputs": [
        {
            "name": "input",
            "content": {
                "type": 1,
                "textContent": "用户输入内容"
            }
        }
    ]
}

2.2 运行时管理

# 恢复中断的工作流
POST /workflow/runtime/resume/{runtimeUuid}
Content-Type: application/json
{
    "feedbackContent": "用户反馈内容"
}

# 查询工作流执行历史
GET /workflow/runtime/page?wfUuid=工作流UUID&currentPage=1&pageSize=10

# 查询运行时节点详情
GET /workflow/runtime/nodes/{runtimeUuid}

# 清理运行时数据
POST /workflow/runtime/clear?wfUuid=工作流UUID

3. 管理端接口

3.1 工作流管理

# 搜索所有工作流
POST /admin/workflow/search
Content-Type: application/json
{
    "title": "搜索关键词",
    "isPublic": true,
    "isEnable": true
}

# 启用/禁用工作流
POST /admin/workflow/enable?uuid=工作流UUID&isEnable=true

核心实现

1. 工作流引擎 (WorkflowEngine)

工作流引擎是整个模块的核心,负责:

  • 工作流图的构建和编译
  • 节点执行调度
  • 状态管理和持久化
  • 流式输出处理
public class WorkflowEngine {
    // 核心执行方法
    public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
        // 1. 验证工作流状态
        // 2. 创建运行时实例
        // 3. 构建状态图
        // 4. 执行工作流
        // 5. 处理流式输出
    }
    
    // 恢复执行方法
    public void resume(String userInput) {
        // 1. 更新状态
        // 2. 继续执行
    }
}

2. 节点工厂 (WfNodeFactory)

节点工厂负责根据组件类型创建对应的节点实例:

public class WfNodeFactory {
    public static AbstractWfNode create(WorkflowComponent component, 
                                      WorkflowNode node, 
                                      WfState wfState, 
                                      WfNodeState nodeState) {
        // 根据组件类型创建对应的节点实例
        switch (component.getName()) {
            case "Answer":
                return new LLMAnswerNode(component, node, wfState, nodeState);
            case "Switcher":
                return new SwitcherNode(component, node, wfState, nodeState);
            // ... 其他节点类型
        }
    }
}

3. 图构建器 (WorkflowGraphBuilder)

图构建器负责将工作流定义转换为可执行的状态图:

public class WorkflowGraphBuilder {
    public StateGraph<WfNodeState> build(WorkflowNode startNode) {
        // 1. 构建编译节点树
        // 2. 转换为状态图
        // 3. 添加节点和边
        // 4. 处理条件分支
        // 5. 处理并行执行
    }
}

流式响应机制

1. SSE 事件类型

工作流执行过程中会发送多种类型的 SSE 事件:

// 节点开始执行
[NODE_RUN_节点UUID] - 节点执行开始事件

// 节点输入数据
[NODE_INPUT_节点UUID] - 节点输入数据事件

// 节点输出数据
[NODE_OUTPUT_节点UUID] - 节点输出数据事件

// 流式内容块
[NODE_CHUNK_节点UUID] - 流式内容块事件

// 等待用户输入
[NODE_WAIT_FEEDBACK_BY_节点UUID] - 等待用户输入事件

2. 流式处理流程

  1. 初始化: 创建工作流运行时实例
  2. 节点执行: 逐个执行工作流节点
  3. 实时输出: 通过 SSE 实时推送执行结果
  4. 状态更新: 实时更新节点和工作流状态
  5. 错误处理: 捕获并处理执行过程中的错误

扩展开发

1. 自定义节点开发

要开发自定义工作流节点,需要:

  1. 创建节点类:继承 AbstractWfNode
  2. 实现处理逻辑:重写 onProcess() 方法
  3. 定义配置类:创建节点配置类
  4. 注册组件:在组件表中注册新组件
public class CustomNode extends AbstractWfNode {
    @Override
    protected NodeProcessResult onProcess() {
        // 实现自定义处理逻辑
        List<NodeIOData> outputs = new ArrayList<>();
        // ... 处理逻辑
        return NodeProcessResult.success(outputs);
    }
}

2. 自定义组件注册

-- 在 t_workflow_component 表中添加新组件
INSERT INTO t_workflow_component (uuid, name, title, remark, is_enable) 
VALUES (REPLACE(UUID(), '-', ''), 'CustomNode', '自定义节点', '自定义节点描述', true);