feat: 工作流第一版提交

This commit is contained in:
lihao05
2025-10-21 09:52:33 +08:00
parent e7d7de79fe
commit 3c21bf6fd3
16 changed files with 856 additions and 355 deletions

View File

@@ -0,0 +1,432 @@
# Ruoyi-AI 工作流模块详细说明文档
## 概述
Ruoyi-AI 工作流模块是一个基于 LangGraph4j 的智能工作流引擎支持可视化工作流设计、AI 模型集成、条件分支、人机交互等高级功能。该模块采用微服务架构,提供完整的 RESTful API 和流式响应支持。
## 模块架构
### 1. 模块结构
```
ruoyi-ai/
├── ruoyi-modules/
│ └── ruoyi-workflow/ # 工作流核心模块
│ ├── pom.xml
│ └── src/main/java/org/ruoyi/workflow/
│ └── controller/ # 控制器层
│ ├── WorkflowController.java
│ ├── WorkflowRuntimeController.java
│ └── admin/ # 管理端控制器
│ ├── AdminWorkflowController.java
│ └── AdminWorkflowComponentController.java
└── ruoyi-modules-api/
└── ruoyi-workflow-api/ # 工作流API模块
├── pom.xml
└── src/main/java/org/ruoyi/workflow/
├── entity/ # 实体类
├── dto/ # 数据传输对象
├── service/ # 服务接口
├── mapper/ # 数据访问层
├── workflow/ # 工作流核心逻辑
├── enums/ # 枚举类
├── util/ # 工具类
└── exception/ # 异常处理
```
### 2. 核心依赖
- **LangGraph4j**: 1.5.3 - 工作流图执行引擎
- **LangChain4j**: 1.2.0 - AI 模型集成框架
- **Spring Boot**: 3.x - 应用框架
- **MyBatis Plus**: 数据访问层
- **Redis**: 缓存和状态管理
- **Swagger/OpenAPI**: API 文档
## 核心功能
### 1. 工作流管理
#### 1.1 工作流定义
- **创建工作流**: 支持自定义标题、描述、公开性设置
- **编辑工作流**: 可视化节点编辑、连接线配置
- **版本控制**: 支持工作流的版本管理和回滚
- **权限管理**: 支持公开/私有工作流设置
#### 1.2 工作流执行
- **流式执行**: 基于 SSE 的实时流式响应
- **状态管理**: 完整的执行状态跟踪
- **错误处理**: 详细的错误信息和异常处理
- **中断恢复**: 支持工作流中断和恢复执行
### 2. 节点类型
#### 2.1 基础节点
- **Start**: 开始节点,定义工作流入口
- **End**: 结束节点,定义工作流出口
#### 2.2 AI 模型节点
- **Answer**: 大语言模型问答节点
- **Dalle3**: DALL-E 3 图像生成
- **Tongyiwanx**: 通义万相图像生成
- **Classifier**: 内容分类节点
#### 2.3 数据处理节点
- **DocumentExtractor**: 文档信息提取
- **KeywordExtractor**: 关键词提取
- **FaqExtractor**: 常见问题提取
- **KnowledgeRetrieval**: 知识库检索
#### 2.4 控制流节点
- **Switcher**: 条件分支节点
- **HumanFeedback**: 人机交互节点
#### 2.5 外部集成节点
- **Google**: Google 搜索集成
- **MailSend**: 邮件发送
- **HttpRequest**: HTTP 请求
- **Template**: 模板转换
### 3. 数据流管理
#### 3.1 输入输出定义
```java
// 节点输入输出数据结构
public class NodeIOData {
private String name; // 参数名称
private NodeIODataContent content; // 参数内容
}
// 支持的数据类型
public enum WfIODataTypeEnum {
TEXT, // 文本
NUMBER, // 数字
BOOLEAN, // 布尔值
FILES, // 文件
OPTIONS // 选项
}
```
#### 3.2 参数引用
- **节点间引用**: 支持上游节点输出作为下游节点输入
- **参数映射**: 自动处理参数名称映射
- **类型转换**: 自动进行数据类型转换
## 数据库设计
### 1. 核心表结构
#### 1.1 工作流定义表 (t_workflow)
```sql
CREATE TABLE t_workflow (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
title VARCHAR(100) NOT NULL DEFAULT '',
remark TEXT NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
is_public TINYINT(1) NOT NULL DEFAULT 0,
is_enable TINYINT(1) NOT NULL DEFAULT 1,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.2 工作流节点表 (t_workflow_node)
```sql
CREATE TABLE t_workflow_node (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
workflow_component_id BIGINT NOT NULL DEFAULT 0,
user_id BIGINT NOT NULL DEFAULT 0,
title VARCHAR(100) NOT NULL DEFAULT '',
remark VARCHAR(500) NOT NULL DEFAULT '',
input_config JSON NOT NULL DEFAULT ('{}'),
node_config JSON NOT NULL DEFAULT ('{}'),
position_x DOUBLE NOT NULL DEFAULT 0,
position_y DOUBLE NOT NULL DEFAULT 0,
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.3 工作流边表 (t_workflow_edge)
```sql
CREATE TABLE t_workflow_edge (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
workflow_id BIGINT NOT NULL DEFAULT 0,
source_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
source_handle VARCHAR(32) NOT NULL DEFAULT '',
target_node_uuid VARCHAR(32) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.4 工作流运行时表 (t_workflow_runtime)
```sql
CREATE TABLE t_workflow_runtime (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) NOT NULL DEFAULT '',
user_id BIGINT NOT NULL DEFAULT 0,
workflow_id BIGINT NOT NULL DEFAULT 0,
input JSON NOT NULL DEFAULT ('{}'),
output JSON NOT NULL DEFAULT ('{}'),
status SMALLINT NOT NULL DEFAULT 1,
status_remark VARCHAR(250) NOT NULL DEFAULT '',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted TINYINT(1) NOT NULL DEFAULT 0
);
```
#### 1.5 工作流组件表 (t_workflow_component)
```sql
CREATE TABLE t_workflow_component (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
uuid VARCHAR(32) DEFAULT '' NOT NULL,
name VARCHAR(32) DEFAULT '' NOT NULL,
title VARCHAR(100) DEFAULT '' NOT NULL,
remark TEXT NOT NULL,
display_order INT DEFAULT 0 NOT NULL,
is_enable TINYINT(1) DEFAULT 0 NOT NULL,
create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
is_deleted TINYINT(1) DEFAULT 0 NOT NULL
);
```
## API 接口
### 1. 工作流管理接口
#### 1.1 基础操作
```http
#
POST /workflow/add
Content-Type: application/json
{
"title": "",
"remark": "",
"isPublic": false
}
#
POST /workflow/update
Content-Type: application/json
{
"uuid": "UUID",
"title": "",
"remark": ""
}
#
POST /workflow/del/{uuid}
# /
POST /workflow/enable/{uuid}?enable=true
```
#### 1.2 搜索和查询
```http
#
GET /workflow/mine/search?keyword=&isPublic=true&currentPage=1&pageSize=10
#
GET /workflow/public/search?keyword=&currentPage=1&pageSize=10
#
GET /workflow/public/component/list
```
### 2. 工作流执行接口
#### 2.1 流式执行
```http
#
POST /workflow/run
Content-Type: application/json
Accept: text/event-stream
{
"uuid": "UUID",
"inputs": [
{
"name": "input",
"content": {
"type": 1,
"textContent": ""
}
}
]
}
```
#### 2.2 运行时管理
```http
#
POST /workflow/runtime/resume/{runtimeUuid}
Content-Type: application/json
{
"feedbackContent": ""
}
#
GET /workflow/runtime/page?wfUuid=UUID&currentPage=1&pageSize=10
#
GET /workflow/runtime/nodes/{runtimeUuid}
#
POST /workflow/runtime/clear?wfUuid=UUID
```
### 3. 管理端接口
#### 3.1 工作流管理
```http
#
POST /admin/workflow/search
Content-Type: application/json
{
"title": "",
"isPublic": true,
"isEnable": true
}
# /
POST /admin/workflow/enable?uuid=UUID&isEnable=true
```
## 核心实现
### 1. 工作流引擎 (WorkflowEngine)
工作流引擎是整个模块的核心,负责:
- 工作流图的构建和编译
- 节点执行调度
- 状态管理和持久化
- 流式输出处理
```java
public class WorkflowEngine {
// 核心执行方法
public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
// 1. 验证工作流状态
// 2. 创建运行时实例
// 3. 构建状态图
// 4. 执行工作流
// 5. 处理流式输出
}
// 恢复执行方法
public void resume(String userInput) {
// 1. 更新状态
// 2. 继续执行
}
}
```
### 2. 节点工厂 (WfNodeFactory)
节点工厂负责根据组件类型创建对应的节点实例:
```java
public class WfNodeFactory {
public static AbstractWfNode create(WorkflowComponent component,
WorkflowNode node,
WfState wfState,
WfNodeState nodeState) {
// 根据组件类型创建对应的节点实例
switch (component.getName()) {
case "Answer":
return new LLMAnswerNode(component, node, wfState, nodeState);
case "Switcher":
return new SwitcherNode(component, node, wfState, nodeState);
// ... 其他节点类型
}
}
}
```
### 3. 图构建器 (WorkflowGraphBuilder)
图构建器负责将工作流定义转换为可执行的状态图:
```java
public class WorkflowGraphBuilder {
public StateGraph<WfNodeState> build(WorkflowNode startNode) {
// 1. 构建编译节点树
// 2. 转换为状态图
// 3. 添加节点和边
// 4. 处理条件分支
// 5. 处理并行执行
}
}
```
## 流式响应机制
### 1. SSE 事件类型
工作流执行过程中会发送多种类型的 SSE 事件:
```javascript
// 节点开始执行
[NODE_RUN_节点UUID] - 节点执行开始事件
// 节点输入数据
[NODE_INPUT_节点UUID] - 节点输入数据事件
// 节点输出数据
[NODE_OUTPUT_节点UUID] - 节点输出数据事件
// 流式内容块
[NODE_CHUNK_节点UUID] - 流式内容块事件
// 等待用户输入
[NODE_WAIT_FEEDBACK_BY_节点UUID] - 等待用户输入事件
```
### 2. 流式处理流程
1. **初始化**: 创建工作流运行时实例
2. **节点执行**: 逐个执行工作流节点
3. **实时输出**: 通过 SSE 实时推送执行结果
4. **状态更新**: 实时更新节点和工作流状态
5. **错误处理**: 捕获并处理执行过程中的错误
## 扩展开发
### 1. 自定义节点开发
要开发自定义工作流节点,需要:
1. **创建节点类**:继承 `AbstractWfNode`
2. **实现处理逻辑**:重写 `onProcess()` 方法
3. **定义配置类**:创建节点配置类
4. **注册组件**:在组件表中注册新组件
```java
public class CustomNode extends AbstractWfNode {
@Override
protected NodeProcessResult onProcess() {
// 实现自定义处理逻辑
List<NodeIOData> outputs = new ArrayList<>();
// ... 处理逻辑
return NodeProcessResult.success(outputs);
}
}
```
### 2. 自定义组件注册
```sql
-- 在 t_workflow_component 表中添加新组件
INSERT INTO t_workflow_component (uuid, name, title, remark, is_enable)
VALUES (REPLACE(UUID(), '-', ''), 'CustomNode', '自定义节点', '自定义节点描述', true);
```

View File

@@ -43,6 +43,11 @@
<artifactId>ruoyi-system-api</artifactId>
</dependency>
<dependency>
<groupId>org.ruoyi</groupId>
<artifactId>ruoyi-common-satoken</artifactId>
</dependency>
<dependency>
<groupId>org.ruoyi</groupId>
<artifactId>ruoyi-common-mail</artifactId>

View File

@@ -1,67 +1,122 @@
package org.ruoyi.workflow.base;
import io.micrometer.common.util.StringUtils;
import cn.dev33.satoken.stp.StpUtil;
import org.apache.commons.lang3.StringUtils;
import org.ruoyi.common.core.domain.model.LoginUser;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.common.satoken.utils.LoginHelper;
import org.ruoyi.workflow.entity.User;
import org.ruoyi.workflow.enums.UserStatusEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import static org.ruoyi.workflow.enums.ErrorEnum.A_USER_NOT_FOUND;
/**
* 线程上下文适配器,统一接入 Sa-Token 登录态。
*/
public class ThreadContext {
private static final ThreadLocal<User> currentUser = new ThreadLocal<>();
private static final ThreadLocal<String> currentToken = new ThreadLocal<>();
private static final ThreadLocal<User> CURRENT_USER = new ThreadLocal<>();
private static final ThreadLocal<String> CURRENT_TOKEN = new ThreadLocal<>();
private ThreadContext() {
}
/**
* 获取当前登录的工作流用户。
*/
public static User getCurrentUser() {
User user = new User();
user.setName("admin");
user.setEmail("12345@qq.com");
user.setUuid("123456789");
user.setUnderstandContextMsgPairNum(1);
user.setQuotaByTokenDaily(1);
user.setQuotaByTokenMonthly(1);
user.setQuotaByRequestDaily(1);
user.setQuotaByRequestMonthly(1);
user.setQuotaByImageDaily(1);
user.setQuotaByImageMonthly(1);
user.setUserStatus(UserStatusEnum.NORMAL);
user.setIsAdmin(true);
user.setId(1L);
return user;
User cached = CURRENT_USER.get();
if (cached != null) {
return cached;
}
LoginUser loginUser = LoginHelper.getLoginUser();
if (loginUser == null) {
throw new BaseException(A_USER_NOT_FOUND.getInfo());
}
User mapped = mapToWorkflowUser(loginUser);
CURRENT_USER.set(mapped);
return mapped;
}
/**
* 允许在测试或特殊场景下显式设置当前用户。
*/
public static void setCurrentUser(User user) {
currentUser.set(user);
if (user == null) {
CURRENT_USER.remove();
} else {
CURRENT_USER.set(user);
}
}
/**
* 获取当前登录用户 ID。
*/
public static Long getCurrentUserId() {
return 1L;
Long userId = LoginHelper.getUserId();
if (userId != null) {
return userId;
}
return getCurrentUser().getId();
}
/**
* 获取当前访问 token。
*/
public static String getToken() {
return currentToken.get();
String token = CURRENT_TOKEN.get();
if (StringUtils.isNotBlank(token)) {
return token;
}
try {
token = StpUtil.getTokenValue();
} catch (Exception ignore) {
token = null;
}
if (StringUtils.isNotBlank(token)) {
CURRENT_TOKEN.set(token);
}
return token;
}
public static void setToken(String token) {
currentToken.set(token);
if (StringUtils.isBlank(token)) {
CURRENT_TOKEN.remove();
} else {
CURRENT_TOKEN.set(token);
}
}
public static boolean isLogin() {
return StringUtils.isNotBlank(currentToken.get());
return LoginHelper.isLogin();
}
public static User getExistCurrentUser() {
User user = ThreadContext.getCurrentUser();
if (null == user) {
throw new WorkflowBaseException(A_USER_NOT_FOUND);
}
return user;
return getCurrentUser();
}
public void unload() {
currentUser.remove();
currentToken.remove();
public static void unload() {
CURRENT_USER.remove();
CURRENT_TOKEN.remove();
}
private static User mapToWorkflowUser(LoginUser loginUser) {
User user = new User();
user.setId(loginUser.getUserId());
String nickname = loginUser.getNickName();
user.setName(StringUtils.defaultIfBlank(nickname, loginUser.getUsername()));
user.setEmail(loginUser.getUsername());
user.setUuid(String.valueOf(loginUser.getUserId()));
user.setUserStatus(UserStatusEnum.NORMAL);
user.setIsAdmin(LoginHelper.isSuperAdmin(loginUser.getUserId()));
user.setUnderstandContextMsgPairNum(0);
user.setQuotaByTokenDaily(0);
user.setQuotaByTokenMonthly(0);
user.setQuotaByRequestDaily(0);
user.setQuotaByRequestMonthly(0);
user.setQuotaByImageDaily(0);
user.setQuotaByImageMonthly(0);
user.setIsDeleted(false);
return user;
}
}

View File

@@ -1,49 +0,0 @@
package org.ruoyi.workflow.exception;
import org.ruoyi.workflow.enums.ErrorEnum;
import java.text.MessageFormat;
public class WorkflowBaseException extends RuntimeException {
private final String code;
private final String info;
private Object data;
public WorkflowBaseException(String code, String info) {
super(code + ":" + info);
this.code = code;
this.info = info;
}
public WorkflowBaseException(ErrorEnum errorEnum, String... infoValues) {
super(errorEnum.getCode() + ":" + MessageFormat.format(errorEnum.getInfo(), infoValues));
this.code = errorEnum.getCode();
if (infoValues.length > 0) {
this.info = MessageFormat.format(errorEnum.getInfo(), infoValues);
} else {
this.info = errorEnum.getInfo();
}
}
public String getCode() {
return code;
}
public String getInfo() {
return info;
}
public Object getData() {
if (null != data) {
return data;
}
return getMessage();
}
public WorkflowBaseException setData(Object data) {
this.data = data;
return this;
}
}

View File

@@ -7,11 +7,11 @@ import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.dto.workflow.WfComponentReq;
import org.ruoyi.workflow.dto.workflow.WfComponentSearchReq;
import org.ruoyi.workflow.entity.WorkflowComponent;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.mapper.WorkflowComponentMapper;
import org.ruoyi.workflow.util.PrivilegeUtil;
import org.ruoyi.workflow.util.UuidUtil;
@@ -74,11 +74,18 @@ public class WorkflowComponentService extends ServiceImpl<WorkflowComponentMappe
@CacheEvict(cacheNames = {WORKFLOW_COMPONENTS, WORKFLOW_COMPONENT_START_KEY})
public void deleteByUuid(String uuid) {
WorkflowComponent component = PrivilegeUtil.checkAndGetByUuid(uuid, this.query(), ErrorEnum.A_WF_COMPONENT_NOT_FOUND);
Integer refNodeCount = baseMapper.countRefNodes(uuid);
if (refNodeCount > 0) {
throw new WorkflowBaseException(C_WF_COMPONENT_DELETED_FAIL_BY_USED);
} else {
// PrivilegeUtil.checkAndDelete(uuid, this.query(), ChainWrappers.updateChain(baseMapper), ErrorEnum.A_WF_COMPONENT_NOT_FOUND);
if (refNodeCount != null && refNodeCount > 0) {
throw new BaseException(C_WF_COMPONENT_DELETED_FAIL_BY_USED.getInfo());
}
boolean updated = ChainWrappers.lambdaUpdateChain(baseMapper)
.eq(WorkflowComponent::getId, component.getId())
.set(WorkflowComponent::getIsDeleted, true)
.set(WorkflowComponent::getIsEnable, false)
.update();
if (!updated) {
throw new BaseException(ErrorEnum.A_WF_COMPONENT_NOT_FOUND.getInfo());
}
}
@@ -106,7 +113,7 @@ public class WorkflowComponentService extends ServiceImpl<WorkflowComponentMappe
return components.stream()
.filter(component -> WfComponentNameEnum.START.getName().equals(component.getName()))
.findFirst()
.orElseThrow(() -> new WorkflowBaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND));
.orElseThrow(() -> new BaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND.getInfo()));
}
public WorkflowComponent getComponent(Long id) {
@@ -114,6 +121,6 @@ public class WorkflowComponentService extends ServiceImpl<WorkflowComponentMappe
return components.stream()
.filter(component -> component.getId().equals(id))
.findFirst()
.orElseThrow(() -> new WorkflowBaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND));
.orElseThrow(() -> new BaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND.getInfo()));
}
}

View File

@@ -6,10 +6,10 @@ import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.dto.workflow.WfEdgeReq;
import org.ruoyi.workflow.entity.WorkflowEdge;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.mapper.WorkflowEdgeMapper;
import org.ruoyi.workflow.util.MPPageUtil;
import org.ruoyi.workflow.util.UuidUtil;
@@ -98,7 +98,7 @@ public class WorkflowEdgeService extends ServiceImpl<WorkflowEdgeMapper, Workflo
WorkflowEdge old = self.getByUuid(uuid);
if (null != old && !old.getWorkflowId().equals(workflowId)) {
log.error("该边不属于指定的工作流,删除失败,workflowId:{},node workflowId:{}", workflowId, workflowId);
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
ChainWrappers.lambdaUpdateChain(baseMapper)
.eq(WorkflowEdge::getWorkflowId, workflowId)

View File

@@ -7,13 +7,13 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.dto.workflow.WfNodeDto;
import org.ruoyi.workflow.entity.Workflow;
import org.ruoyi.workflow.entity.WorkflowComponent;
import org.ruoyi.workflow.entity.WorkflowNode;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.enums.WfIODataTypeEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.mapper.WorkflowNodeMapper;
import org.ruoyi.workflow.util.JsonUtil;
import org.ruoyi.workflow.util.MPPageUtil;
@@ -135,7 +135,7 @@ public class WorkflowNodeService extends ServiceImpl<WorkflowNodeMapper, Workflo
.orElse(null);
if (null == component) {
log.error("节点不存在,uuid:{},title:{}", workflowNode.getUuid(), workflowNode.getTitle());
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
if (component.getName().equals(WfComponentNameEnum.MAIL_SEND.getName())) {
@@ -162,7 +162,7 @@ public class WorkflowNodeService extends ServiceImpl<WorkflowNodeMapper, Workflo
.orElse(null);
if (null == component) {
log.error("节点不存在,uuid:{},title:{}", workflowNode.getUuid(), workflowNode.getTitle());
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
if (component.getName().equals(WfComponentNameEnum.MAIL_SEND.getName())) {
// MailSendNodeConfig mailSendNodeConfig = JsonUtil.fromJson(workflowNode.getNodeConfig(), MailSendNodeConfig.class);
@@ -189,7 +189,7 @@ public class WorkflowNodeService extends ServiceImpl<WorkflowNodeMapper, Workflo
}
if (!old.getWorkflowId().equals(workflowId)) {
log.error("节点不属于指定的工作流,删除失败,workflowId:{},node workflowId:{}", workflowId, workflowId);
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
if (workflowComponentService.getStartComponent().getId().equals(old.getWorkflowComponentId())) {
log.warn("开始节点不能删除,uuid:{}", old.getUuid());

View File

@@ -6,6 +6,7 @@ import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.base.ThreadContext;
import org.ruoyi.workflow.dto.workflow.WfEdgeReq;
import org.ruoyi.workflow.dto.workflow.WfNodeDto;
@@ -14,7 +15,6 @@ import org.ruoyi.workflow.dto.workflow.WorkflowUpdateReq;
import org.ruoyi.workflow.entity.User;
import org.ruoyi.workflow.entity.Workflow;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.mapper.WorkflowMapper;
import org.ruoyi.workflow.util.MPPageUtil;
import org.ruoyi.workflow.util.PrivilegeUtil;
@@ -70,7 +70,7 @@ public class WorkflowService extends ServiceImpl<WorkflowMapper, Workflow> {
public WorkflowResp updateBaseInfo(String wfUuid, String title, String remark, Boolean isPublic) {
if (StringUtils.isAnyBlank(wfUuid, title)) {
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
ChainWrappers.lambdaUpdateChain(baseMapper)
.eq(Workflow::getUuid, wfUuid)
@@ -108,7 +108,7 @@ public class WorkflowService extends ServiceImpl<WorkflowMapper, Workflow> {
.last("limit 1")
.one();
if (null == workflow) {
throw new WorkflowBaseException(ErrorEnum.A_WF_NOT_FOUND);
throw new BaseException(ErrorEnum.A_WF_NOT_FOUND.getInfo());
}
return workflow;
}
@@ -160,7 +160,7 @@ public class WorkflowService extends ServiceImpl<WorkflowMapper, Workflow> {
public void enable(String uuid, Boolean enable) {
if (null == enable) {
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
Workflow workflow = PrivilegeUtil.checkAndGetByUuid(uuid, this.query(), ErrorEnum.A_WF_NOT_FOUND);
ChainWrappers.lambdaUpdateChain(baseMapper)

View File

@@ -1,7 +1,6 @@
package org.ruoyi.workflow.util;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -17,7 +16,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.io.IOException;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

View File

@@ -1,9 +1,9 @@
package org.ruoyi.workflow.util;
import com.baomidou.mybatisplus.extension.conditions.query.QueryChainWrapper;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.base.ThreadContext;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import static org.ruoyi.workflow.cosntant.AdiConstant.*;
@@ -24,7 +24,7 @@ public class PrivilegeUtil {
target = lambdaQueryChainWrapper.eq(null != id, COLUMN_NAME_ID, id).eq(null != uuid, COLUMN_NAME_UUID, uuid).eq(COLUMN_NAME_USER_ID, ThreadContext.getCurrentUserId()).eq(COLUMN_NAME_IS_DELETE, false).oneOpt().orElse(null);
}
if (null == target) {
throw new WorkflowBaseException(exceptionMessage);
throw new BaseException(exceptionMessage.getInfo());
}
return target;
}

View File

@@ -1,13 +1,12 @@
package org.ruoyi.workflow.workflow;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.collections4.CollectionUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.enums.WfIODataTypeEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.util.JsonUtil;
import org.ruoyi.workflow.workflow.data.NodeIOData;
import org.ruoyi.workflow.workflow.data.NodeIODataFilesContent;
@@ -36,7 +35,7 @@ public class WfNodeIODataUtil {
JsonNode nameObj = data.get("name");
JsonNode content = data.get("content");
if (null == nameObj || null == content) {
throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR);
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
String name = nameObj.asText();
Integer type = content.get("type").asInt();

View File

@@ -10,16 +10,15 @@ import org.bsc.async.AsyncGenerator;
import org.bsc.langgraph4j.*;
import org.bsc.langgraph4j.checkpoint.MemorySaver;
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer;
import org.bsc.langgraph4j.state.AgentState;
import org.bsc.langgraph4j.state.StateSnapshot;
import org.bsc.langgraph4j.streaming.StreamingOutput;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.base.NodeInputConfigTypeHandler;
import org.ruoyi.workflow.dto.workflow.WfRuntimeNodeDto;
import org.ruoyi.workflow.dto.workflow.WfRuntimeResp;
import org.ruoyi.workflow.entity.*;
import org.ruoyi.workflow.enums.ErrorEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.helper.SSEEmitterHelper;
import org.ruoyi.workflow.service.WorkflowRuntimeNodeService;
import org.ruoyi.workflow.service.WorkflowRuntimeService;
@@ -34,12 +33,8 @@ import java.util.*;
import java.util.function.Function;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.ruoyi.workflow.cosntant.AdiConstant.WorkflowConstant.*;
import static org.ruoyi.workflow.enums.ErrorEnum.*;
import static org.ruoyi.workflow.workflow.WfComponentNameEnum.HUMAN_FEEDBACK;
@Slf4j
public class WorkflowEngine {
@@ -50,11 +45,6 @@ public class WorkflowEngine {
private final SSEEmitterHelper sseEmitterHelper;
private final WorkflowRuntimeService workflowRuntimeService;
private final WorkflowRuntimeNodeService workflowRuntimeNodeService;
private final ObjectStreamStateSerializer<WfNodeState> stateSerializer = new ObjectStreamStateSerializer<>(WfNodeState::new);
private final Map<String, List<StateGraph<WfNodeState>>> stateGraphNodes = new HashMap<>();
private final Map<String, List<StateGraph<WfNodeState>>> stateGraphEdges = new HashMap<>();
private final Map<String, String> rootToSubGraph = new HashMap<>();
private final Map<String, GraphCompileNode> nodeToParallelBranch = new HashMap<>();
private CompiledGraph<WfNodeState> app;
private SseEmitter sseEmitter;
private User user;
@@ -84,7 +74,7 @@ public class WorkflowEngine {
log.info("WorkflowEngine run,userId:{},workflowUuid:{},userInputs:{}", user.getId(), workflow.getUuid(), userInputs);
if (!this.workflow.getIsEnable()) {
sseEmitterHelper.sendErrorAndComplete(user.getId(), sseEmitter, ErrorEnum.A_WF_DISABLED.getInfo());
throw new WorkflowBaseException(ErrorEnum.A_WF_DISABLED);
throw new BaseException(ErrorEnum.A_WF_DISABLED.getInfo());
}
Long workflowId = this.workflow.getId();
@@ -96,20 +86,17 @@ public class WorkflowEngine {
Pair<WorkflowNode, Set<WorkflowNode>> startAndEnds = findStartAndEndNode();
WorkflowNode startNode = startAndEnds.getLeft();
List<NodeIOData> wfInputs = getAndCheckUserInput(userInputs, startNode);
//工作流运行实例状态
this.wfState = new WfState(user, wfInputs, runtimeUuid);
workflowRuntimeService.updateInput(this.wfRuntimeResp.getId(), wfState);
CompileNode rootCompileNode = new CompileNode();
rootCompileNode.setId(startNode.getUuid());
//构建整棵树
buildCompileNode(rootCompileNode, startNode);
//主状态图
StateGraph<WfNodeState> mainStateGraph = new StateGraph<>(stateSerializer);
this.wfState.addEdge(START, startNode.getUuid());
//构建包括所有节点的状态图
buildStateGraph(null, mainStateGraph, rootCompileNode);
WorkflowGraphBuilder graphBuilder = new WorkflowGraphBuilder(
components,
wfNodes,
wfEdges,
this::runNode,
this.wfState);
StateGraph<WfNodeState> mainStateGraph = graphBuilder.build(startNode);
MemorySaver saver = new MemorySaver();
CompileConfig compileConfig = CompileConfig.builder().checkpointSaver(saver)
@@ -130,13 +117,13 @@ public class WorkflowEngine {
StateSnapshot<WfNodeState> stateSnapshot = app.getState(invokeConfig);
String nextNode = stateSnapshot.config().nextNode().orElse("");
//还有下个节点,表示进入中断状态,等待用户输入后继续执
//还有下个节点,表示进入中断状态,等待用户输入后继续执<EFBFBD>?
if (StringUtils.isNotBlank(nextNode) && !nextNode.equalsIgnoreCase(END)) {
String intTip = WorkflowUtil.getHumanFeedbackTip(nextNode, wfNodes);
//将等待输入信息[事件与提示词]发送到到客户端
SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_WAIT_FEEDBACK_BY_" + nextNode + "]", intTip);
InterruptedFlow.RUNTIME_TO_GRAPH.put(wfState.getUuid(), this);
//更新状
//更新状<EFBFBD>?
wfState.setProcessStatus(WORKFLOW_PROCESS_STATUS_WAITING_INPUT);
workflowRuntimeService.updateOutput(wfRuntimeResp.getId(), wfState);
} else {
@@ -170,7 +157,7 @@ public class WorkflowEngine {
log.error("error", e);
String errorMsg = e.getMessage();
if (errorMsg.contains("parallel node doesn't support conditional branch")) {
errorMsg = "并行节点中不能包含条件分";
errorMsg = "并行节点中不能包含条件分<EFBFBD>?";
}
sseEmitterHelper.sendErrorAndComplete(user.getId(), sseEmitter, errorMsg);
workflowRuntimeService.updateStatus(wfRuntimeResp.getId(), WORKFLOW_PROCESS_STATUS_FAIL, errorMsg);
@@ -214,7 +201,7 @@ public class WorkflowEngine {
}
}, (is) -> {
workflowRuntimeNodeService.updateOutput(runtimeNodeDto.getId(), nodeState);
//并行节点内部的节点执行结束后,需要主动向客户端发送输出结
//并行节点内部的节点执行结束后,需要主动向客户端发送输出结<EFBFBD>?
String nodeUuid = wfNode.getUuid();
List<NodeIOData> nodeOutputs = nodeState.getOutputs();
for (NodeIOData output : nodeOutputs) {
@@ -227,10 +214,10 @@ public class WorkflowEngine {
}
} catch (Exception e) {
log.error("Node run error", e);
throw new WorkflowBaseException(ErrorEnum.B_WF_RUN_ERROR);
throw new BaseException(ErrorEnum.B_WF_RUN_ERROR.getInfo());
}
resultMap.put("name", wfNode.getTitle());
//langgraph4j state中的data不做数据存储只存储元数
//langgraph4j state中的data不做数据存储只存储元数<EFBFBD>?
StreamingChatGenerator<AgentState> generator = wfState.getNodeToStreamingGenerator().get(wfNode.getUuid());
if (null != generator) {
resultMap.put("_streaming_messages", generator);
@@ -251,6 +238,9 @@ public class WorkflowEngine {
String node = streamingOutput.node();
String chunk = streamingOutput.chunk();
log.info("node:{},chunk:{}", node, chunk);
Map<String, String> strMap = new HashMap<>();
strMap.put("ck", chunk);
// SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_CHUNK_" + node + "]", strMap.toString());
SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_CHUNK_" + node + "]", chunk);
} else {
AbstractWfNode abstractWfNode = wfState.getCompletedNodes().stream()
@@ -274,8 +264,8 @@ public class WorkflowEngine {
* 校验用户输入并组装成工作流的输入
*
* @param userInputs 用户输入
* @param startNode 开始节点定
* @return 正确的用户输入列
* @param startNode 开始节点定<EFBFBD>?
* @return 正确的用户输入列<EFBFBD>?
*/
private List<NodeIOData> getAndCheckUserInput(List<ObjectNode> userInputs, WorkflowNode startNode) {
WfNodeInputConfig wfNodeInputConfig = NodeInputConfigTypeHandler.fillNodeInputConfig(startNode.getInputConfig());
@@ -292,19 +282,19 @@ public class WorkflowEngine {
}
Integer dataType = nodeIOData.getContent().getType();
if (null == dataType) {
throw new WorkflowBaseException(A_WF_INPUT_INVALID);
throw new BaseException(A_WF_INPUT_INVALID.getInfo());
}
requiredParamMissing = false;
boolean valid = paramDefinition.checkValue(nodeIOData);
if (!valid) {
log.error("用户输入无效,workflowId:{}", startNode.getWorkflowId());
throw new WorkflowBaseException(ErrorEnum.A_WF_INPUT_INVALID);
throw new BaseException(ErrorEnum.A_WF_INPUT_INVALID.getInfo());
}
wfInputs.add(nodeIOData);
}
if (requiredParamMissing) {
log.error("在流程定义中必填的参数没有传进来,name:{}", paramNameFromDef);
throw new WorkflowBaseException(A_WF_INPUT_MISSING);
throw new BaseException(A_WF_INPUT_MISSING.getInfo());
}
}
return wfInputs;
@@ -323,7 +313,7 @@ public class WorkflowEngine {
Optional<WorkflowComponent> wfComponent = components.stream().filter(item -> item.getId().equals(node.getWorkflowComponentId())).findFirst();
if (wfComponent.isPresent() && WfComponentNameEnum.START.getName().equals(wfComponent.get().getName())) {
if (null != startNode) {
throw new WorkflowBaseException(ErrorEnum.A_WF_MULTIPLE_START_NODE);
throw new BaseException(ErrorEnum.A_WF_MULTIPLE_START_NODE.getInfo());
}
startNode = node;
} else if (wfComponent.isPresent() && WfComponentNameEnum.END.getName().equals(wfComponent.get().getName())) {
@@ -331,8 +321,8 @@ public class WorkflowEngine {
}
}
if (null == startNode) {
log.error("没有开始节点,workflowId:{}", wfNodes.get(0).getWorkflowId());
throw new WorkflowBaseException(ErrorEnum.A_WF_START_NODE_NOT_FOUND);
log.error("没有开始节点, workflowId:{}", wfNodes.get(0).getWorkflowId());
throw new BaseException(ErrorEnum.A_WF_START_NODE_NOT_FOUND.getInfo());
}
//Find all end nodes
wfNodes.forEach(item -> {
@@ -354,217 +344,11 @@ public class WorkflowEngine {
log.info("end nodes:{}", endNodes);
if (endNodes.isEmpty()) {
log.error("没有结束节点,workflowId:{}", startNode.getWorkflowId());
throw new WorkflowBaseException(A_WF_END_NODE_NOT_FOUND);
throw new BaseException(A_WF_END_NODE_NOT_FOUND.getInfo());
}
return Pair.of(startNode, endNodes);
}
private void buildCompileNode(
CompileNode parentNode,
WorkflowNode node) {
log.info("buildByNode, parentNode:{}, node:{},title:{}", parentNode.getId(), node.getUuid(), node.getTitle());
CompileNode newNode;
List<String> upstreamNodeUuids = getUpstreamNodeUuids(node.getUuid());
if (upstreamNodeUuids.isEmpty()) {
log.error("节点{}没有上游节点", node.getUuid());
newNode = parentNode;
} else if (upstreamNodeUuids.size() == 1) {
String upstreamUuid = upstreamNodeUuids.get(0);
boolean pointToParallel = pointToParallelBranch(upstreamUuid);
if (pointToParallel) {
String rootId = node.getUuid();
GraphCompileNode graphCompileNode = getOrCreateGraphCompileNode(rootId);
appendToNextNodes(parentNode, graphCompileNode);
newNode = graphCompileNode;
} else if (parentNode instanceof GraphCompileNode graphCompileNode) {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
graphCompileNode.appendToLeaf(newNode);
} else {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
appendToNextNodes(parentNode, newNode);
}
} else {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
GraphCompileNode parallelBranch = nodeToParallelBranch.get(parentNode.getId());
appendToNextNodes(Objects.requireNonNullElse(parallelBranch, parentNode), newNode);
}
if (null == newNode) {
log.error("节点{}不存在", node.getUuid());
return;
}
List<String> downstreamUuids = getDownstreamNodeUuids(node.getUuid());
for (String downstream : downstreamUuids) {
Optional<WorkflowNode> n = wfNodes.stream().filter(item -> item.getUuid().equals(downstream)).findFirst();
n.ifPresent(workflowNode -> buildCompileNode(newNode, workflowNode));
}
}
/**
* 构建完整的stategraph
*
* @param upstreamCompileNode 上游节点
* @param stateGraph 当前状态图
* @param compileNode 当前节点
* @throws GraphStateException 状态图异常
*/
private void buildStateGraph(CompileNode upstreamCompileNode, StateGraph<WfNodeState> stateGraph, CompileNode compileNode) throws GraphStateException {
log.info("buildStateGraph,upstreamCompileNode:{},node:{}", upstreamCompileNode, compileNode.getId());
String stateGraphNodeUuid = compileNode.getId();
if (null == upstreamCompileNode) {
addNodeToStateGraph(stateGraph, stateGraphNodeUuid);
addEdgeToStateGraph(stateGraph, START, compileNode.getId());
} else {
if (compileNode instanceof GraphCompileNode graphCompileNode) {
String stateGraphId = graphCompileNode.getId();
CompileNode root = graphCompileNode.getRoot();
String rootId = root.getId();
String existSubGraphId = rootToSubGraph.get(rootId);
if (StringUtils.isBlank(existSubGraphId)) {
StateGraph<WfNodeState> subgraph = new StateGraph<>(stateSerializer);
addNodeToStateGraph(subgraph, rootId);
addEdgeToStateGraph(subgraph, START, rootId);
for (CompileNode child : root.getNextNodes()) {
buildStateGraph(root, subgraph, child);
}
addEdgeToStateGraph(subgraph, graphCompileNode.getTail().getId(), END);
stateGraph.addNode(stateGraphId, subgraph.compile());
rootToSubGraph.put(rootId, stateGraphId);
stateGraphNodeUuid = stateGraphId;
} else {
stateGraphNodeUuid = existSubGraphId;
}
} else {
addNodeToStateGraph(stateGraph, stateGraphNodeUuid);
}
//ConditionalEdge 的创建另外处理
if (Boolean.FALSE.equals(upstreamCompileNode.getConditional())) {
addEdgeToStateGraph(stateGraph, upstreamCompileNode.getId(), stateGraphNodeUuid);
}
}
List<CompileNode> nextNodes = compileNode.getNextNodes();
if (nextNodes.size() > 1) {
boolean conditional = nextNodes.stream().noneMatch(item -> item instanceof GraphCompileNode);
compileNode.setConditional(conditional);
for (CompileNode nextNode : nextNodes) {
buildStateGraph(compileNode, stateGraph, nextNode);
}
//节点是"条件分支"或"分类"的情况下不支持并行执行所以直接使用条件ConditionalEdge
if (conditional) {
List<String> targets = nextNodes.stream().map(CompileNode::getId).toList();
Map<String, String> mappings = new HashMap<>();
for (String target : targets) {
mappings.put(target, target);
}
stateGraph.addConditionalEdges(
stateGraphNodeUuid,
edge_async(state -> state.data().get("next").toString()),
mappings
);
}
} else if (nextNodes.size() == 1) {
for (CompileNode nextNode : nextNodes) {
buildStateGraph(compileNode, stateGraph, nextNode);
}
} else {
addEdgeToStateGraph(stateGraph, stateGraphNodeUuid, END);
}
}
private GraphCompileNode getOrCreateGraphCompileNode(String rootId) {
GraphCompileNode exist = nodeToParallelBranch.get(rootId);
if (null == exist) {
GraphCompileNode graphCompileNode = new GraphCompileNode();
graphCompileNode.setId("parallel_" + rootId);
graphCompileNode.setRoot(CompileNode.builder().id(rootId).conditional(false).nextNodes(new ArrayList<>()).build());
nodeToParallelBranch.put(rootId, graphCompileNode);
exist = graphCompileNode;
}
return exist;
}
private List<String> getUpstreamNodeUuids(String nodeUuid) {
return this.wfEdges.stream()
.filter(edge -> edge.getTargetNodeUuid().equals(nodeUuid))
.map(WorkflowEdge::getSourceNodeUuid)
.toList();
}
private List<String> getDownstreamNodeUuids(String nodeUuid) {
return this.wfEdges.stream()
.filter(edge -> edge.getSourceNodeUuid().equals(nodeUuid))
.map(WorkflowEdge::getTargetNodeUuid)
.toList();
}
//判断节点是否属于子图
private boolean pointToParallelBranch(String nodeUuid) {
int edgeCount = 0;
for (WorkflowEdge edge : this.wfEdges) {
if (edge.getSourceNodeUuid().equals(nodeUuid) && StringUtils.isBlank(edge.getSourceHandle())) {
edgeCount = edgeCount + 1;
}
}
return edgeCount > 1;
}
/**
* 添加节点到状态图
*
* @param stateGraph
* @param stateGraphNodeUuid
* @throws GraphStateException
*/
private void addNodeToStateGraph(StateGraph<WfNodeState> stateGraph, String stateGraphNodeUuid) throws GraphStateException {
List<StateGraph<WfNodeState>> stateGraphList = stateGraphNodes.computeIfAbsent(stateGraphNodeUuid, k -> new ArrayList<>());
boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph);
if (exist) {
log.info("state graph node exist,stateGraphNodeUuid:{}", stateGraphNodeUuid);
return;
}
log.info("addNodeToStateGraph,node uuid:{}", stateGraphNodeUuid);
WorkflowNode wfNode = getNodeByUuid(stateGraphNodeUuid);
stateGraph.addNode(stateGraphNodeUuid, node_async((state) -> runNode(wfNode, state)));
stateGraphList.add(stateGraph);
//记录人机交互节点
WorkflowComponent wfComponent = components.stream().filter(item -> item.getId().equals(wfNode.getWorkflowComponentId())).findFirst().orElseThrow();
if (HUMAN_FEEDBACK.getName().equals(wfComponent.getName())) {
this.wfState.addInterruptNode(stateGraphNodeUuid);
}
}
private void addEdgeToStateGraph(StateGraph<WfNodeState> stateGraph, String source, String target) throws GraphStateException {
String key = source + "_" + target;
List<StateGraph<WfNodeState>> stateGraphList = stateGraphEdges.computeIfAbsent(key, k -> new ArrayList<>());
boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph);
if (exist) {
log.info("state graph edge exist,source:{},target:{}", source, target);
return;
}
log.info("addEdgeToStateGraph,source:{},target:{}", source, target);
stateGraph.addEdge(source, target);
stateGraphList.add(stateGraph);
}
private WorkflowNode getNodeByUuid(String nodeUuid) {
return wfNodes.stream()
.filter(item -> item.getUuid().equals(nodeUuid))
.findFirst()
.orElseThrow(() -> new WorkflowBaseException(ErrorEnum.A_WF_NODE_NOT_FOUND));
}
private void appendToNextNodes(CompileNode compileNode, CompileNode newNode) {
boolean exist = compileNode.getNextNodes().stream().anyMatch(item -> item.getId().equals(newNode.getId()));
if (!exist) {
compileNode.getNextNodes().add(newNode);
}
}
public CompiledGraph<WfNodeState> getApp() {
return app;

View File

@@ -0,0 +1,257 @@
package org.ruoyi.workflow.workflow;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.bsc.langgraph4j.GraphStateException;
import org.bsc.langgraph4j.StateGraph;
import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.entity.WorkflowComponent;
import org.ruoyi.workflow.entity.WorkflowEdge;
import org.ruoyi.workflow.entity.WorkflowNode;
import org.ruoyi.workflow.enums.ErrorEnum;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.bsc.langgraph4j.StateGraph.END;
import static org.bsc.langgraph4j.StateGraph.START;
import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async;
import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async;
import static org.ruoyi.workflow.workflow.WfComponentNameEnum.HUMAN_FEEDBACK;
/**
* 负责构建工作流运行所依赖的状态图<E68081>?
*/
@Slf4j
public class WorkflowGraphBuilder {
private final Map<Long, WorkflowComponent> componentIndex;
private final Map<String, WorkflowNode> nodeIndex;
private final Map<String, List<WorkflowEdge>> edgesBySource;
private final Map<String, List<WorkflowEdge>> edgesByTarget;
private final WorkflowNodeRunner nodeRunner;
private final WfState wfState;
private final ObjectStreamStateSerializer<WfNodeState> stateSerializer = new ObjectStreamStateSerializer<>(WfNodeState::new);
private final Map<String, List<StateGraph<WfNodeState>>> stateGraphNodes = new HashMap<>();
private final Map<String, List<StateGraph<WfNodeState>>> stateGraphEdges = new HashMap<>();
private final Map<String, String> rootToSubGraph = new HashMap<>();
private final Map<String, GraphCompileNode> nodeToParallelBranch = new HashMap<>();
public WorkflowGraphBuilder(
List<WorkflowComponent> components,
List<WorkflowNode> nodes,
List<WorkflowEdge> edges,
WorkflowNodeRunner nodeRunner,
WfState wfState) {
this.componentIndex = components.stream()
.collect(Collectors.toMap(WorkflowComponent::getId, Function.identity(), (origin, ignore) -> origin));
this.nodeIndex = nodes.stream()
.collect(Collectors.toMap(WorkflowNode::getUuid, Function.identity(), (origin, ignore) -> origin));
this.edgesBySource = edges.stream().collect(Collectors.groupingBy(WorkflowEdge::getSourceNodeUuid));
this.edgesByTarget = edges.stream().collect(Collectors.groupingBy(WorkflowEdge::getTargetNodeUuid));
this.nodeRunner = nodeRunner;
this.wfState = wfState;
}
public StateGraph<WfNodeState> build(WorkflowNode startNode) throws GraphStateException {
CompileNode rootCompileNode = new CompileNode();
rootCompileNode.setId(startNode.getUuid());
buildCompileNode(rootCompileNode, startNode);
StateGraph<WfNodeState> mainStateGraph = new StateGraph<>(stateSerializer);
wfState.addEdge(START, startNode.getUuid());
buildStateGraph(null, mainStateGraph, rootCompileNode);
return mainStateGraph;
}
private void buildCompileNode(CompileNode parentNode, WorkflowNode node) {
log.info("buildCompileNode, parentNode:{}, node:{}, title:{}", parentNode.getId(), node.getUuid(), node.getTitle());
CompileNode newNode;
List<String> upstreamNodeUuids = getUpstreamNodeUuids(node.getUuid());
if (upstreamNodeUuids.isEmpty()) {
log.error("节点{}没有上游节点", node.getUuid());
newNode = parentNode;
} else if (upstreamNodeUuids.size() == 1) {
String upstreamUuid = upstreamNodeUuids.get(0);
boolean pointToParallel = pointToParallelBranch(upstreamUuid);
if (pointToParallel) {
String rootId = node.getUuid();
GraphCompileNode graphCompileNode = getOrCreateGraphCompileNode(rootId);
appendToNextNodes(parentNode, graphCompileNode);
newNode = graphCompileNode;
} else if (parentNode instanceof GraphCompileNode graphCompileNode) {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
graphCompileNode.appendToLeaf(newNode);
} else {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
appendToNextNodes(parentNode, newNode);
}
} else {
newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build();
GraphCompileNode parallelBranch = nodeToParallelBranch.get(parentNode.getId());
appendToNextNodes(Objects.requireNonNullElse(parallelBranch, parentNode), newNode);
}
if (newNode == null) {
log.error("节点:{}不存<E4B88D>?", node.getUuid());
return;
}
for (String downstream : getDownstreamNodeUuids(node.getUuid())) {
WorkflowNode downstreamNode = nodeIndex.get(downstream);
if (downstreamNode != null) {
buildCompileNode(newNode, downstreamNode);
}
}
}
private void buildStateGraph(CompileNode upstreamCompileNode,
StateGraph<WfNodeState> stateGraph,
CompileNode compileNode) throws GraphStateException {
log.info("buildStateGraph, upstream:{}, node:{}", upstreamCompileNode, compileNode.getId());
String stateGraphNodeUuid = compileNode.getId();
if (upstreamCompileNode == null) {
addNodeToStateGraph(stateGraph, stateGraphNodeUuid);
addEdgeToStateGraph(stateGraph, START, compileNode.getId());
} else {
if (compileNode instanceof GraphCompileNode graphCompileNode) {
String stateGraphId = graphCompileNode.getId();
CompileNode root = graphCompileNode.getRoot();
String rootId = root.getId();
String existSubGraphId = rootToSubGraph.get(rootId);
if (StringUtils.isBlank(existSubGraphId)) {
StateGraph<WfNodeState> subgraph = new StateGraph<>(stateSerializer);
addNodeToStateGraph(subgraph, rootId);
addEdgeToStateGraph(subgraph, START, rootId);
for (CompileNode child : root.getNextNodes()) {
buildStateGraph(root, subgraph, child);
}
addEdgeToStateGraph(subgraph, graphCompileNode.getTail().getId(), END);
stateGraph.addNode(stateGraphId, subgraph.compile());
rootToSubGraph.put(rootId, stateGraphId);
stateGraphNodeUuid = stateGraphId;
} else {
stateGraphNodeUuid = existSubGraphId;
}
} else {
addNodeToStateGraph(stateGraph, stateGraphNodeUuid);
}
if (Boolean.FALSE.equals(upstreamCompileNode.getConditional())) {
addEdgeToStateGraph(stateGraph, upstreamCompileNode.getId(), stateGraphNodeUuid);
}
}
List<CompileNode> nextNodes = compileNode.getNextNodes();
if (nextNodes.size() > 1) {
boolean conditional = nextNodes.stream().noneMatch(item -> item instanceof GraphCompileNode);
compileNode.setConditional(conditional);
for (CompileNode nextNode : nextNodes) {
buildStateGraph(compileNode, stateGraph, nextNode);
}
if (conditional) {
List<String> targets = nextNodes.stream().map(CompileNode::getId).toList();
Map<String, String> mappings = new HashMap<>();
for (String target : targets) {
mappings.put(target, target);
}
stateGraph.addConditionalEdges(
stateGraphNodeUuid,
edge_async(state -> state.data().get("next").toString()),
mappings
);
}
} else if (nextNodes.size() == 1) {
for (CompileNode nextNode : nextNodes) {
buildStateGraph(compileNode, stateGraph, nextNode);
}
} else {
addEdgeToStateGraph(stateGraph, stateGraphNodeUuid, END);
}
}
private GraphCompileNode getOrCreateGraphCompileNode(String rootId) {
GraphCompileNode exist = nodeToParallelBranch.get(rootId);
if (exist == null) {
GraphCompileNode graphCompileNode = new GraphCompileNode();
graphCompileNode.setId("parallel_" + rootId);
graphCompileNode.setRoot(CompileNode.builder().id(rootId).conditional(false).nextNodes(new ArrayList<>()).build());
nodeToParallelBranch.put(rootId, graphCompileNode);
exist = graphCompileNode;
}
return exist;
}
private List<String> getUpstreamNodeUuids(String nodeUuid) {
return edgesByTarget.getOrDefault(nodeUuid, List.of())
.stream()
.map(WorkflowEdge::getSourceNodeUuid)
.toList();
}
private List<String> getDownstreamNodeUuids(String nodeUuid) {
return edgesBySource.getOrDefault(nodeUuid, List.of())
.stream()
.map(WorkflowEdge::getTargetNodeUuid)
.toList();
}
private boolean pointToParallelBranch(String nodeUuid) {
return edgesBySource.getOrDefault(nodeUuid, List.of())
.stream()
.filter(edge -> StringUtils.isBlank(edge.getSourceHandle()))
.count() > 1;
}
private void addNodeToStateGraph(StateGraph<WfNodeState> stateGraph, String stateGraphNodeUuid) throws GraphStateException {
List<StateGraph<WfNodeState>> stateGraphList = stateGraphNodes.computeIfAbsent(stateGraphNodeUuid, k -> new ArrayList<>());
boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph);
if (exist) {
log.info("state graph node exist,stateGraphNodeUuid:{}", stateGraphNodeUuid);
return;
}
log.info("addNodeToStateGraph,node uuid:{}", stateGraphNodeUuid);
WorkflowNode wfNode = getNodeByUuid(stateGraphNodeUuid);
stateGraph.addNode(stateGraphNodeUuid, node_async(state -> nodeRunner.run(wfNode, state)));
stateGraphList.add(stateGraph);
WorkflowComponent component = componentIndex.get(wfNode.getWorkflowComponentId());
if (component == null) {
throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo());
}
if (HUMAN_FEEDBACK.getName().equals(component.getName())) {
wfState.addInterruptNode(stateGraphNodeUuid);
}
}
private void addEdgeToStateGraph(StateGraph<WfNodeState> stateGraph, String source, String target) throws GraphStateException {
String key = source + "_" + target;
List<StateGraph<WfNodeState>> stateGraphList = stateGraphEdges.computeIfAbsent(key, k -> new ArrayList<>());
boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph);
if (exist) {
log.info("state graph edge exist,source:{},target:{}", source, target);
return;
}
log.info("addEdgeToStateGraph,source:{},target:{}", source, target);
stateGraph.addEdge(source, target);
stateGraphList.add(stateGraph);
}
private WorkflowNode getNodeByUuid(String nodeUuid) {
WorkflowNode workflowNode = nodeIndex.get(nodeUuid);
if (workflowNode == null) {
throw new BaseException(ErrorEnum.A_WF_NODE_NOT_FOUND.getInfo());
}
return workflowNode;
}
private void appendToNextNodes(CompileNode compileNode, CompileNode newNode) {
boolean exist = compileNode.getNextNodes().stream().anyMatch(item -> item.getId().equals(newNode.getId()));
if (!exist) {
compileNode.getNextNodes().add(newNode);
}
}
}

View File

@@ -0,0 +1,14 @@
package org.ruoyi.workflow.workflow;
import org.ruoyi.workflow.entity.WorkflowNode;
import java.util.Map;
/**
* 回调接口,负责执行业务节点并返回下游编排所需的元数据。
*/
@FunctionalInterface
public interface WorkflowNodeRunner {
Map<String, Object> run(WorkflowNode node, WfNodeState nodeState);
}

View File

@@ -3,8 +3,8 @@ package org.ruoyi.workflow.workflow;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.entity.*;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.helper.SSEEmitterHelper;
import org.ruoyi.workflow.service.*;
import org.springframework.context.annotation.Lazy;
@@ -87,7 +87,7 @@ public class WorkflowStarter {
WorkflowEngine workflowEngine = InterruptedFlow.RUNTIME_TO_GRAPH.get(runtimeUuid);
if (null == workflowEngine) {
log.error("工作流恢复执行时失败,runtime:{}", runtimeUuid);
throw new WorkflowBaseException(A_WF_RESUME_FAIL);
throw new BaseException(A_WF_RESUME_FAIL.getInfo());
}
workflowEngine.resume(userInput);
}

View File

@@ -1,6 +1,5 @@
package org.ruoyi.workflow.workflow.node;
import cn.hutool.core.collection.CollUtil;
import com.fasterxml.jackson.databind.node.ObjectNode;
import jakarta.validation.ConstraintViolation;
import lombok.Data;
@@ -8,11 +7,11 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.StringUtils;
import org.ruoyi.common.core.exception.base.BaseException;
import org.ruoyi.workflow.base.NodeInputConfigTypeHandler;
import org.ruoyi.workflow.entity.WorkflowComponent;
import org.ruoyi.workflow.entity.WorkflowNode;
import org.ruoyi.workflow.enums.WfIODataTypeEnum;
import org.ruoyi.workflow.exception.WorkflowBaseException;
import org.ruoyi.workflow.util.JsonUtil;
import org.ruoyi.workflow.util.SpringUtil;
import org.ruoyi.workflow.workflow.NodeProcessResult;
@@ -185,13 +184,13 @@ public abstract class AbstractWfNode {
ObjectNode configObj = JsonUtil.toBean(node.getNodeConfig(), ObjectNode.class);
if (configObj.isEmpty()) {
log.error("node config is empty,node uuid:{}", state.getUuid());
throw new WorkflowBaseException(A_WF_NODE_CONFIG_NOT_FOUND);
throw new BaseException(A_WF_NODE_CONFIG_NOT_FOUND.getInfo());
}
log.info("node config:{}", configObj);
T nodeConfig = JsonUtil.fromJson(configObj, clazz);
if (null == nodeConfig) {
log.warn("找不到节点的配置,node uuid:{}", state.getUuid());
throw new WorkflowBaseException(A_WF_NODE_CONFIG_ERROR);
throw new BaseException(A_WF_NODE_CONFIG_ERROR.getInfo());
}
boolean configValid = true;
try {
@@ -206,7 +205,7 @@ public abstract class AbstractWfNode {
}
if (!configValid) {
log.warn("节点配置错误,node uuid:{}", state.getUuid());
throw new WorkflowBaseException(A_WF_NODE_CONFIG_ERROR);
throw new BaseException(A_WF_NODE_CONFIG_ERROR.getInfo());
}
return nodeConfig;
}