mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-03-13 20:53:42 +08:00
13 KiB
13 KiB
Ruoyi-AI 工作流模块详细说明文档
概述
Ruoyi-AI 工作流模块是一个基于 LangGraph4j 的智能工作流引擎,支持可视化工作流设计、AI 模型集成、条件分支、人机交互等高级功能。该模块采用微服务架构,提供完整的 RESTful API 和流式响应支持。
模块架构
1. 模块结构
ruoyi-ai/
├── ruoyi-modules/
│ └── ruoyi-workflow/ # 工作流核心模块
│ ├── pom.xml
│ └── src/main/java/org/ruoyi/workflow/
│ └── controller/ # 控制器层
│ ├── WorkflowController.java
│ ├── WorkflowRuntimeController.java
│ └── admin/ # 管理端控制器
│ ├── AdminWorkflowController.java
│ └── AdminWorkflowComponentController.java
└── ruoyi-modules-api/
└── ruoyi-workflow-api/ # 工作流API模块
├── pom.xml
└── src/main/java/org/ruoyi/workflow/
├── entity/ # 实体类
├── dto/ # 数据传输对象
├── service/ # 服务接口
├── mapper/ # 数据访问层
├── workflow/ # 工作流核心逻辑
├── enums/ # 枚举类
├── util/ # 工具类
└── exception/ # 异常处理
2. 核心依赖
- LangGraph4j: 1.5.3 - 工作流图执行引擎
- LangChain4j: 1.2.0 - AI 模型集成框架
- Spring Boot: 3.x - 应用框架
- MyBatis Plus: 数据访问层
- Redis: 缓存和状态管理
- Swagger/OpenAPI: API 文档
核心功能
1. 工作流管理
1.1 工作流定义
- 创建工作流: 支持自定义标题、描述、公开性设置
- 编辑工作流: 可视化节点编辑、连接线配置
- 版本控制: 支持工作流的版本管理和回滚
- 权限管理: 支持公开/私有工作流设置
1.2 工作流执行
- 流式执行: 基于 SSE 的实时流式响应
- 状态管理: 完整的执行状态跟踪
- 错误处理: 详细的错误信息和异常处理
- 中断恢复: 支持工作流中断和恢复执行
2. 节点类型
2.1 基础节点
- Start: 开始节点,定义工作流入口
- End: 结束节点,定义工作流出口
2.2 AI 模型节点
- Answer: 大语言模型问答节点
- Dalle3: DALL-E 3 图像生成
- Tongyiwanx: 通义万相图像生成
- Classifier: 内容分类节点
2.3 数据处理节点
- DocumentExtractor: 文档信息提取
- KeywordExtractor: 关键词提取
- FaqExtractor: 常见问题提取
- KnowledgeRetrieval: 知识库检索
2.4 控制流节点
- Switcher: 条件分支节点
- HumanFeedback: 人机交互节点
2.5 外部集成节点
- Google: Google 搜索集成
- MailSend: 邮件发送
- HttpRequest: HTTP 请求
- Template: 模板转换
3. 数据流管理
3.1 输入输出定义
// 节点输入输出数据结构
public class NodeIOData {
private String name; // 参数名称
private NodeIODataContent content; // 参数内容
}
// 支持的数据类型
public enum WfIODataTypeEnum {
TEXT, // 文本
NUMBER, // 数字
BOOLEAN, // 布尔值
FILES, // 文件
OPTIONS // 选项
}
3.2 参数引用
- 节点间引用: 支持上游节点输出作为下游节点输入
- 参数映射: 自动处理参数名称映射
- 类型转换: 自动进行数据类型转换
数据库设计
1. 核心表结构
1.1 工作流定义表 (t_workflow)
CREATE TABLE t_workflow (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
title VARCHAR(100) NOT NULL DEFAULT '',
remark TEXT NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
is_public TINYINT(1) NOT NULL DEFAULT 0,
is_enable TINYINT(1) NOT NULL DEFAULT 1,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
1.2 工作流节点表 (t_workflow_node)
CREATE TABLE t_workflow_node (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
workflow_component_id BIGINT NOT NULL DEFAULT 0,
user_id BIGINT NOT NULL DEFAULT 0,
title VARCHAR(100) NOT NULL DEFAULT '',
remark VARCHAR(500) NOT NULL DEFAULT '',
input_config JSON NOT NULL DEFAULT ('{}'),
node_config JSON NOT NULL DEFAULT ('{}'),
position_x DOUBLE NOT NULL DEFAULT 0,
position_y DOUBLE NOT NULL DEFAULT 0,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
1.3 工作流边表 (t_workflow_edge)
CREATE TABLE t_workflow_edge (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
source_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
source_handle VARCHAR(32) NOT NULL DEFAULT '',
target_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
1.4 工作流运行时表 (t_workflow_runtime)
CREATE TABLE t_workflow_runtime (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
workflow_id BIGINT NOT NULL DEFAULT 0,
input JSON NOT NULL DEFAULT ('{}'),
output JSON NOT NULL DEFAULT ('{}'),
status SMALLINT NOT NULL DEFAULT 1,
status_remark VARCHAR(250) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
1.5 工作流组件表 (t_workflow_component)
CREATE TABLE t_workflow_component (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) DEFAULT '' NOT NULL,
name VARCHAR(32) DEFAULT '' NOT NULL,
title VARCHAR(100) DEFAULT '' NOT NULL,
remark TEXT NOT NULL,
display_order INT DEFAULT 0 NOT NULL,
is_enable TINYINT(1) DEFAULT 0 NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
is_deleted TINYINT(1) DEFAULT 0 NOT NULL
);
API 接口
1. 工作流管理接口
1.1 基础操作
# 创建工作流
POST /workflow/add
Content-Type: application/json
{
"title": "工作流标题",
"remark": "工作流描述",
"isPublic": false
}
# 更新工作流
POST /workflow/update
Content-Type: application/json
{
"uuid": "工作流UUID",
"title": "新标题",
"remark": "新描述"
}
# 删除工作流
POST /workflow/del/{uuid}
# 启用/禁用工作流
POST /workflow/enable/{uuid}?enable=true
1.2 搜索和查询
# 搜索我的工作流
GET /workflow/mine/search?keyword=关键词&isPublic=true¤tPage=1&pageSize=10
# 搜索公开工作流
GET /workflow/public/search?keyword=关键词¤tPage=1&pageSize=10
# 获取工作流组件列表
GET /workflow/public/component/list
2. 工作流执行接口
2.1 流式执行
# 流式执行工作流
POST /workflow/run
Content-Type: application/json
Accept: text/event-stream
{
"uuid": "工作流UUID",
"inputs": [
{
"name": "input",
"content": {
"type": 1,
"textContent": "用户输入内容"
}
}
]
}
2.2 运行时管理
# 恢复中断的工作流
POST /workflow/runtime/resume/{runtimeUuid}
Content-Type: application/json
{
"feedbackContent": "用户反馈内容"
}
# 查询工作流执行历史
GET /workflow/runtime/page?wfUuid=工作流UUID¤tPage=1&pageSize=10
# 查询运行时节点详情
GET /workflow/runtime/nodes/{runtimeUuid}
# 清理运行时数据
POST /workflow/runtime/clear?wfUuid=工作流UUID
3. 管理端接口
3.1 工作流管理
# 搜索所有工作流
POST /admin/workflow/search
Content-Type: application/json
{
"title": "搜索关键词",
"isPublic": true,
"isEnable": true
}
# 启用/禁用工作流
POST /admin/workflow/enable?uuid=工作流UUID&isEnable=true
核心实现
1. 工作流引擎 (WorkflowEngine)
工作流引擎是整个模块的核心,负责:
- 工作流图的构建和编译
- 节点执行调度
- 状态管理和持久化
- 流式输出处理
public class WorkflowEngine {
// 核心执行方法
public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
// 1. 验证工作流状态
// 2. 创建运行时实例
// 3. 构建状态图
// 4. 执行工作流
// 5. 处理流式输出
}
// 恢复执行方法
public void resume(String userInput) {
// 1. 更新状态
// 2. 继续执行
}
}
2. 节点工厂 (WfNodeFactory)
节点工厂负责根据组件类型创建对应的节点实例:
public class WfNodeFactory {
public static AbstractWfNode create(WorkflowComponent component,
WorkflowNode node,
WfState wfState,
WfNodeState nodeState) {
// 根据组件类型创建对应的节点实例
switch (component.getName()) {
case "Answer":
return new LLMAnswerNode(component, node, wfState, nodeState);
case "Switcher":
return new SwitcherNode(component, node, wfState, nodeState);
// ... 其他节点类型
}
}
}
3. 图构建器 (WorkflowGraphBuilder)
图构建器负责将工作流定义转换为可执行的状态图:
public class WorkflowGraphBuilder {
public StateGraph<WfNodeState> build(WorkflowNode startNode) {
// 1. 构建编译节点树
// 2. 转换为状态图
// 3. 添加节点和边
// 4. 处理条件分支
// 5. 处理并行执行
}
}
流式响应机制
1. SSE 事件类型
工作流执行过程中会发送多种类型的 SSE 事件:
// 节点开始执行
[NODE_RUN_节点UUID] - 节点执行开始事件
// 节点输入数据
[NODE_INPUT_节点UUID] - 节点输入数据事件
// 节点输出数据
[NODE_OUTPUT_节点UUID] - 节点输出数据事件
// 流式内容块
[NODE_CHUNK_节点UUID] - 流式内容块事件
// 等待用户输入
[NODE_WAIT_FEEDBACK_BY_节点UUID] - 等待用户输入事件
2. 流式处理流程
- 初始化: 创建工作流运行时实例
- 节点执行: 逐个执行工作流节点
- 实时输出: 通过 SSE 实时推送执行结果
- 状态更新: 实时更新节点和工作流状态
- 错误处理: 捕获并处理执行过程中的错误
扩展开发
1. 自定义节点开发
要开发自定义工作流节点,需要:
- 创建节点类:继承
AbstractWfNode - 实现处理逻辑:重写
onProcess()方法 - 定义配置类:创建节点配置类
- 注册组件:在组件表中注册新组件
public class CustomNode extends AbstractWfNode {
@Override
protected NodeProcessResult onProcess() {
// 实现自定义处理逻辑
List<NodeIOData> outputs = new ArrayList<>();
// ... 处理逻辑
return NodeProcessResult.success(outputs);
}
}
2. 自定义组件注册
-- 在 t_workflow_component 表中添加新组件
INSERT INTO t_workflow_component (uuid, name, title, remark, is_enable)
VALUES (REPLACE(UUID(), '-', ''), 'CustomNode', '自定义节点', '自定义节点描述', true);