Files
ruoyi-ai/docs/工作流模块说明.md
2025-12-06 14:38:41 +08:00

453 lines
13 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Ruoyi-AI 工作流模块详细说明文档
## 概述
Ruoyi-AI 工作流模块是一个基于 LangGraph4j 的智能工作流引擎支持可视化工作流设计、AI 模型集成、条件分支、人机交互等高级功能。该模块采用微服务架构,提供完整的
RESTful API 和流式响应支持。
## 模块架构
### 1. 模块结构
```
ruoyi-ai/
├── ruoyi-modules/
│ └── ruoyi-workflow/ # 工作流核心模块
│ ├── pom.xml
│ └── src/main/java/org/ruoyi/workflow/
│ └── controller/ # 控制器层
│ ├── WorkflowController.java
│ ├── WorkflowRuntimeController.java
│ └── admin/ # 管理端控制器
│ ├── AdminWorkflowController.java
│ └── AdminWorkflowComponentController.java
└── ruoyi-modules-api/
└── ruoyi-workflow-api/ # 工作流API模块
├── pom.xml
└── src/main/java/org/ruoyi/workflow/
├── entity/ # 实体类
├── dto/ # 数据传输对象
├── service/ # 服务接口
├── mapper/ # 数据访问层
├── workflow/ # 工作流核心逻辑
├── enums/ # 枚举类
├── util/ # 工具类
└── exception/ # 异常处理
```
### 2. 核心依赖
- **LangGraph4j**: 1.5.3 - 工作流图执行引擎
- **LangChain4j**: 1.2.0 - AI 模型集成框架
- **Spring Boot**: 3.x - 应用框架
- **MyBatis Plus**: 数据访问层
- **Redis**: 缓存和状态管理
- **Swagger/OpenAPI**: API 文档
## 核心功能
### 1. 工作流管理
#### 1.1 工作流定义
- **创建工作流**: 支持自定义标题、描述、公开性设置
- **编辑工作流**: 可视化节点编辑、连接线配置
- **版本控制**: 支持工作流的版本管理和回滚
- **权限管理**: 支持公开/私有工作流设置
#### 1.2 工作流执行
- **流式执行**: 基于 SSE 的实时流式响应
- **状态管理**: 完整的执行状态跟踪
- **错误处理**: 详细的错误信息和异常处理
- **中断恢复**: 支持工作流中断和恢复执行
### 2. 节点类型
#### 2.1 基础节点
- **Start**: 开始节点,定义工作流入口
- **End**: 结束节点,定义工作流出口
#### 2.2 AI 模型节点
- **Answer**: 大语言模型问答节点
- **Dalle3**: DALL-E 3 图像生成
- **Tongyiwanx**: 通义万相图像生成
- **Classifier**: 内容分类节点
#### 2.3 数据处理节点
- **DocumentExtractor**: 文档信息提取
- **KeywordExtractor**: 关键词提取
- **FaqExtractor**: 常见问题提取
- **KnowledgeRetrieval**: 知识库检索
#### 2.4 控制流节点
- **Switcher**: 条件分支节点
- **HumanFeedback**: 人机交互节点
#### 2.5 外部集成节点
- **Google**: Google 搜索集成
- **MailSend**: 邮件发送
- **HttpRequest**: HTTP 请求
- **Template**: 模板转换
### 3. 数据流管理
#### 3.1 输入输出定义
```java
// 节点输入输出数据结构
public class NodeIOData {
private String name; // 参数名称
private NodeIODataContent content; // 参数内容
}
// 支持的数据类型
public enum WfIODataTypeEnum {
TEXT, // 文本
NUMBER, // 数字
BOOLEAN, // 布尔值
FILES, // 文件
OPTIONS // 选项
}
```
#### 3.2 参数引用
- **节点间引用**: 支持上游节点输出作为下游节点输入
- **参数映射**: 自动处理参数名称映射
- **类型转换**: 自动进行数据类型转换
## 数据库设计
### 1. 核心表结构
#### 1.1 工作流定义表 (t_workflow)
```sql
CREATE TABLE t_workflow (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
title VARCHAR(100) NOT NULL DEFAULT '',
remark TEXT NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
is_public TINYINT(1) NOT NULL DEFAULT 0,
is_enable TINYINT(1) NOT NULL DEFAULT 1,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.2 工作流节点表 (t_workflow_node)
```sql
CREATE TABLE t_workflow_node (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
workflow_component_id BIGINT NOT NULL DEFAULT 0,
user_id BIGINT NOT NULL DEFAULT 0,
title VARCHAR(100) NOT NULL DEFAULT '',
remark VARCHAR(500) NOT NULL DEFAULT '',
input_config JSON NOT NULL DEFAULT ('{}'),
node_config JSON NOT NULL DEFAULT ('{}'),
position_x DOUBLE NOT NULL DEFAULT 0,
position_y DOUBLE NOT NULL DEFAULT 0,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.3 工作流边表 (t_workflow_edge)
```sql
CREATE TABLE t_workflow_edge (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
source_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
source_handle VARCHAR(32) NOT NULL DEFAULT '',
target_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.4 工作流运行时表 (t_workflow_runtime)
```sql
CREATE TABLE t_workflow_runtime (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
workflow_id BIGINT NOT NULL DEFAULT 0,
input JSON NOT NULL DEFAULT ('{}'),
output JSON NOT NULL DEFAULT ('{}'),
status SMALLINT NOT NULL DEFAULT 1,
status_remark VARCHAR(250) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.5 工作流组件表 (t_workflow_component)
```sql
CREATE TABLE t_workflow_component (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) DEFAULT '' NOT NULL,
name VARCHAR(32) DEFAULT '' NOT NULL,
title VARCHAR(100) DEFAULT '' NOT NULL,
remark TEXT NOT NULL,
display_order INT DEFAULT 0 NOT NULL,
is_enable TINYINT(1) DEFAULT 0 NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
is_deleted TINYINT(1) DEFAULT 0 NOT NULL
);
```
## API 接口
### 1. 工作流管理接口
#### 1.1 基础操作
```http
#
POST /workflow/add
Content-Type: application/json
{
"title": "",
"remark": "",
"isPublic": false
}
#
POST /workflow/update
Content-Type: application/json
{
"uuid": "UUID",
"title": "",
"remark": ""
}
#
POST /workflow/del/{uuid}
# /
POST /workflow/enable/{uuid}?enable=true
```
#### 1.2 搜索和查询
```http
#
GET /workflow/mine/search?keyword=&isPublic=true&currentPage=1&pageSize=10
#
GET /workflow/public/search?keyword=&currentPage=1&pageSize=10
#
GET /workflow/public/component/list
```
### 2. 工作流执行接口
#### 2.1 流式执行
```http
#
POST /workflow/run
Content-Type: application/json
Accept: text/event-stream
{
"uuid": "UUID",
"inputs": [
{
"name": "input",
"content": {
"type": 1,
"textContent": ""
}
}
]
}
```
#### 2.2 运行时管理
```http
#
POST /workflow/runtime/resume/{runtimeUuid}
Content-Type: application/json
{
"feedbackContent": ""
}
#
GET /workflow/runtime/page?wfUuid=UUID&currentPage=1&pageSize=10
#
GET /workflow/runtime/nodes/{runtimeUuid}
#
POST /workflow/runtime/clear?wfUuid=UUID
```
### 3. 管理端接口
#### 3.1 工作流管理
```http
#
POST /admin/workflow/search
Content-Type: application/json
{
"title": "",
"isPublic": true,
"isEnable": true
}
# /
POST /admin/workflow/enable?uuid=UUID&isEnable=true
```
## 核心实现
### 1. 工作流引擎 (WorkflowEngine)
工作流引擎是整个模块的核心,负责:
- 工作流图的构建和编译
- 节点执行调度
- 状态管理和持久化
- 流式输出处理
```java
public class WorkflowEngine {
// 核心执行方法
public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
// 1. 验证工作流状态
// 2. 创建运行时实例
// 3. 构建状态图
// 4. 执行工作流
// 5. 处理流式输出
}
// 恢复执行方法
public void resume(String userInput) {
// 1. 更新状态
// 2. 继续执行
}
}
```
### 2. 节点工厂 (WfNodeFactory)
节点工厂负责根据组件类型创建对应的节点实例:
```java
public class WfNodeFactory {
public static AbstractWfNode create(WorkflowComponent component,
WorkflowNode node,
WfState wfState,
WfNodeState nodeState) {
// 根据组件类型创建对应的节点实例
switch (component.getName()) {
case "Answer":
return new LLMAnswerNode(component, node, wfState, nodeState);
case "Switcher":
return new SwitcherNode(component, node, wfState, nodeState);
// ... 其他节点类型
}
}
}
```
### 3. 图构建器 (WorkflowGraphBuilder)
图构建器负责将工作流定义转换为可执行的状态图:
```java
public class WorkflowGraphBuilder {
public StateGraph<WfNodeState> build(WorkflowNode startNode) {
// 1. 构建编译节点树
// 2. 转换为状态图
// 3. 添加节点和边
// 4. 处理条件分支
// 5. 处理并行执行
}
}
```
## 流式响应机制
### 1. SSE 事件类型
工作流执行过程中会发送多种类型的 SSE 事件:
```javascript
// 节点开始执行
[NODE_RUN_节点UUID] - 节点执行开始事件
// 节点输入数据
[NODE_INPUT_节点UUID] - 节点输入数据事件
// 节点输出数据
[NODE_OUTPUT_节点UUID] - 节点输出数据事件
// 流式内容块
[NODE_CHUNK_节点UUID] - 流式内容块事件
// 等待用户输入
[NODE_WAIT_FEEDBACK_BY_节点UUID] - 等待用户输入事件
```
### 2. 流式处理流程
1. **初始化**: 创建工作流运行时实例
2. **节点执行**: 逐个执行工作流节点
3. **实时输出**: 通过 SSE 实时推送执行结果
4. **状态更新**: 实时更新节点和工作流状态
5. **错误处理**: 捕获并处理执行过程中的错误
## 扩展开发
### 1. 自定义节点开发
要开发自定义工作流节点,需要:
1. **创建节点类**:继承 `AbstractWfNode`
2. **实现处理逻辑**:重写 `onProcess()` 方法
3. **定义配置类**:创建节点配置类
4. **注册组件**:在组件表中注册新组件
```java
public class CustomNode extends AbstractWfNode {
@Override
protected NodeProcessResult onProcess() {
// 实现自定义处理逻辑
List<NodeIOData> outputs = new ArrayList<>();
// ... 处理逻辑
return NodeProcessResult.success(outputs);
}
}
```
### 2. 自定义组件注册
```sql
-- 在 t_workflow_component 表中添加新组件
INSERT INTO t_workflow_component (uuid, name, title, remark, is_enable)
VALUES (REPLACE(UUID(), '-', ''), 'CustomNode', '自定义节点', '自定义节点描述', true);
```