diff --git a/docs/工作流模块说明.md b/docs/工作流模块说明.md new file mode 100644 index 00000000..8d2f4396 --- /dev/null +++ b/docs/工作流模块说明.md @@ -0,0 +1,432 @@ +# Ruoyi-AI 工作流模块详细说明文档 + +## 概述 + +Ruoyi-AI 工作流模块是一个基于 LangGraph4j 的智能工作流引擎,支持可视化工作流设计、AI 模型集成、条件分支、人机交互等高级功能。该模块采用微服务架构,提供完整的 RESTful API 和流式响应支持。 + +## 模块架构 + +### 1. 模块结构 + +``` +ruoyi-ai/ +├── ruoyi-modules/ +│ └── ruoyi-workflow/ # 工作流核心模块 +│ ├── pom.xml +│ └── src/main/java/org/ruoyi/workflow/ +│ └── controller/ # 控制器层 +│ ├── WorkflowController.java +│ ├── WorkflowRuntimeController.java +│ └── admin/ # 管理端控制器 +│ ├── AdminWorkflowController.java +│ └── AdminWorkflowComponentController.java +└── ruoyi-modules-api/ + └── ruoyi-workflow-api/ # 工作流API模块 + ├── pom.xml + └── src/main/java/org/ruoyi/workflow/ + ├── entity/ # 实体类 + ├── dto/ # 数据传输对象 + ├── service/ # 服务接口 + ├── mapper/ # 数据访问层 + ├── workflow/ # 工作流核心逻辑 + ├── enums/ # 枚举类 + ├── util/ # 工具类 + └── exception/ # 异常处理 +``` + +### 2. 核心依赖 + +- **LangGraph4j**: 1.5.3 - 工作流图执行引擎 +- **LangChain4j**: 1.2.0 - AI 模型集成框架 +- **Spring Boot**: 3.x - 应用框架 +- **MyBatis Plus**: 数据访问层 +- **Redis**: 缓存和状态管理 +- **Swagger/OpenAPI**: API 文档 + +## 核心功能 + +### 1. 工作流管理 + +#### 1.1 工作流定义 +- **创建工作流**: 支持自定义标题、描述、公开性设置 +- **编辑工作流**: 可视化节点编辑、连接线配置 +- **版本控制**: 支持工作流的版本管理和回滚 +- **权限管理**: 支持公开/私有工作流设置 + +#### 1.2 工作流执行 +- **流式执行**: 基于 SSE 的实时流式响应 +- **状态管理**: 完整的执行状态跟踪 +- **错误处理**: 详细的错误信息和异常处理 +- **中断恢复**: 支持工作流中断和恢复执行 + +### 2. 节点类型 + +#### 2.1 基础节点 +- **Start**: 开始节点,定义工作流入口 +- **End**: 结束节点,定义工作流出口 + +#### 2.2 AI 模型节点 +- **Answer**: 大语言模型问答节点 +- **Dalle3**: DALL-E 3 图像生成 +- **Tongyiwanx**: 通义万相图像生成 +- **Classifier**: 内容分类节点 + +#### 2.3 数据处理节点 +- **DocumentExtractor**: 文档信息提取 +- **KeywordExtractor**: 关键词提取 +- **FaqExtractor**: 常见问题提取 +- **KnowledgeRetrieval**: 知识库检索 + +#### 2.4 控制流节点 +- **Switcher**: 条件分支节点 +- **HumanFeedback**: 人机交互节点 + +#### 2.5 外部集成节点 +- **Google**: Google 搜索集成 +- **MailSend**: 邮件发送 +- **HttpRequest**: HTTP 请求 +- **Template**: 模板转换 + +### 3. 数据流管理 + +#### 3.1 输入输出定义 +```java +// 节点输入输出数据结构 +public class NodeIOData { + private String name; // 参数名称 + private NodeIODataContent content; // 参数内容 +} + +// 支持的数据类型 +public enum WfIODataTypeEnum { + TEXT, // 文本 + NUMBER, // 数字 + BOOLEAN, // 布尔值 + FILES, // 文件 + OPTIONS // 选项 +} +``` + +#### 3.2 参数引用 +- **节点间引用**: 支持上游节点输出作为下游节点输入 +- **参数映射**: 自动处理参数名称映射 +- **类型转换**: 自动进行数据类型转换 + +## 数据库设计 + +### 1. 核心表结构 + +#### 1.1 工作流定义表 (t_workflow) +```sql +CREATE TABLE t_workflow ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + uuid VARCHAR(32) NOT NULL DEFAULT '', + title VARCHAR(100) NOT NULL DEFAULT '', + remark TEXT NOT NULL DEFAULT '', + user_id BIGINT NOT NULL DEFAULT 0, + is_public TINYINT(1) NOT NULL DEFAULT 0, + is_enable TINYINT(1) NOT NULL DEFAULT 1, + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + is_deleted TINYINT(1) NOT NULL DEFAULT 0 +); +``` + +#### 1.2 工作流节点表 (t_workflow_node) +```sql +CREATE TABLE t_workflow_node ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + uuid VARCHAR(32) NOT NULL DEFAULT '', + workflow_id BIGINT NOT NULL DEFAULT 0, + workflow_component_id BIGINT NOT NULL DEFAULT 0, + user_id BIGINT NOT NULL DEFAULT 0, + title VARCHAR(100) NOT NULL DEFAULT '', + remark VARCHAR(500) NOT NULL DEFAULT '', + input_config JSON NOT NULL DEFAULT ('{}'), + node_config JSON NOT NULL DEFAULT ('{}'), + position_x DOUBLE NOT NULL DEFAULT 0, + position_y DOUBLE NOT NULL DEFAULT 0, + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + is_deleted TINYINT(1) NOT NULL DEFAULT 0 +); +``` + +#### 1.3 工作流边表 (t_workflow_edge) +```sql +CREATE TABLE t_workflow_edge ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + uuid VARCHAR(32) NOT NULL DEFAULT '', + workflow_id BIGINT NOT NULL DEFAULT 0, + source_node_uuid VARCHAR(32) NOT NULL DEFAULT '', + source_handle VARCHAR(32) NOT NULL DEFAULT '', + target_node_uuid VARCHAR(32) NOT NULL DEFAULT '', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + is_deleted TINYINT(1) NOT NULL DEFAULT 0 +); +``` + +#### 1.4 工作流运行时表 (t_workflow_runtime) +```sql +CREATE TABLE t_workflow_runtime ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + uuid VARCHAR(32) NOT NULL DEFAULT '', + user_id BIGINT NOT NULL DEFAULT 0, + workflow_id BIGINT NOT NULL DEFAULT 0, + input JSON NOT NULL DEFAULT ('{}'), + output JSON NOT NULL DEFAULT ('{}'), + status SMALLINT NOT NULL DEFAULT 1, + status_remark VARCHAR(250) NOT NULL DEFAULT '', + create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + update_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + is_deleted TINYINT(1) NOT NULL DEFAULT 0 +); +``` + +#### 1.5 工作流组件表 (t_workflow_component) +```sql +CREATE TABLE t_workflow_component ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + uuid VARCHAR(32) DEFAULT '' NOT NULL, + name VARCHAR(32) DEFAULT '' NOT NULL, + title VARCHAR(100) DEFAULT '' NOT NULL, + remark TEXT NOT NULL, + display_order INT DEFAULT 0 NOT NULL, + is_enable TINYINT(1) DEFAULT 0 NOT NULL, + create_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + update_time DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + is_deleted TINYINT(1) DEFAULT 0 NOT NULL +); +``` + +## API 接口 + +### 1. 工作流管理接口 + +#### 1.1 基础操作 +```http +# 创建工作流 +POST /workflow/add +Content-Type: application/json +{ + "title": "工作流标题", + "remark": "工作流描述", + "isPublic": false +} + +# 更新工作流 +POST /workflow/update +Content-Type: application/json +{ + "uuid": "工作流UUID", + "title": "新标题", + "remark": "新描述" +} + +# 删除工作流 +POST /workflow/del/{uuid} + +# 启用/禁用工作流 +POST /workflow/enable/{uuid}?enable=true +``` + +#### 1.2 搜索和查询 +```http +# 搜索我的工作流 +GET /workflow/mine/search?keyword=关键词&isPublic=true¤tPage=1&pageSize=10 + +# 搜索公开工作流 +GET /workflow/public/search?keyword=关键词¤tPage=1&pageSize=10 + +# 获取工作流组件列表 +GET /workflow/public/component/list +``` + +### 2. 工作流执行接口 + +#### 2.1 流式执行 +```http +# 流式执行工作流 +POST /workflow/run +Content-Type: application/json +Accept: text/event-stream +{ + "uuid": "工作流UUID", + "inputs": [ + { + "name": "input", + "content": { + "type": 1, + "textContent": "用户输入内容" + } + } + ] +} +``` + +#### 2.2 运行时管理 +```http +# 恢复中断的工作流 +POST /workflow/runtime/resume/{runtimeUuid} +Content-Type: application/json +{ + "feedbackContent": "用户反馈内容" +} + +# 查询工作流执行历史 +GET /workflow/runtime/page?wfUuid=工作流UUID¤tPage=1&pageSize=10 + +# 查询运行时节点详情 +GET /workflow/runtime/nodes/{runtimeUuid} + +# 清理运行时数据 +POST /workflow/runtime/clear?wfUuid=工作流UUID +``` + +### 3. 管理端接口 + +#### 3.1 工作流管理 +```http +# 搜索所有工作流 +POST /admin/workflow/search +Content-Type: application/json +{ + "title": "搜索关键词", + "isPublic": true, + "isEnable": true +} + +# 启用/禁用工作流 +POST /admin/workflow/enable?uuid=工作流UUID&isEnable=true +``` + +## 核心实现 + +### 1. 工作流引擎 (WorkflowEngine) + +工作流引擎是整个模块的核心,负责: +- 工作流图的构建和编译 +- 节点执行调度 +- 状态管理和持久化 +- 流式输出处理 + +```java +public class WorkflowEngine { + // 核心执行方法 + public void run(User user, List userInputs, SseEmitter sseEmitter) { + // 1. 验证工作流状态 + // 2. 创建运行时实例 + // 3. 构建状态图 + // 4. 执行工作流 + // 5. 处理流式输出 + } + + // 恢复执行方法 + public void resume(String userInput) { + // 1. 更新状态 + // 2. 继续执行 + } +} +``` + +### 2. 节点工厂 (WfNodeFactory) + +节点工厂负责根据组件类型创建对应的节点实例: + +```java +public class WfNodeFactory { + public static AbstractWfNode create(WorkflowComponent component, + WorkflowNode node, + WfState wfState, + WfNodeState nodeState) { + // 根据组件类型创建对应的节点实例 + switch (component.getName()) { + case "Answer": + return new LLMAnswerNode(component, node, wfState, nodeState); + case "Switcher": + return new SwitcherNode(component, node, wfState, nodeState); + // ... 其他节点类型 + } + } +} +``` + +### 3. 图构建器 (WorkflowGraphBuilder) + +图构建器负责将工作流定义转换为可执行的状态图: + +```java +public class WorkflowGraphBuilder { + public StateGraph build(WorkflowNode startNode) { + // 1. 构建编译节点树 + // 2. 转换为状态图 + // 3. 添加节点和边 + // 4. 处理条件分支 + // 5. 处理并行执行 + } +} +``` + +## 流式响应机制 + +### 1. SSE 事件类型 + +工作流执行过程中会发送多种类型的 SSE 事件: + +```javascript +// 节点开始执行 +[NODE_RUN_节点UUID] - 节点执行开始事件 + +// 节点输入数据 +[NODE_INPUT_节点UUID] - 节点输入数据事件 + +// 节点输出数据 +[NODE_OUTPUT_节点UUID] - 节点输出数据事件 + +// 流式内容块 +[NODE_CHUNK_节点UUID] - 流式内容块事件 + +// 等待用户输入 +[NODE_WAIT_FEEDBACK_BY_节点UUID] - 等待用户输入事件 +``` + +### 2. 流式处理流程 + +1. **初始化**: 创建工作流运行时实例 +2. **节点执行**: 逐个执行工作流节点 +3. **实时输出**: 通过 SSE 实时推送执行结果 +4. **状态更新**: 实时更新节点和工作流状态 +5. **错误处理**: 捕获并处理执行过程中的错误 + + +## 扩展开发 + +### 1. 自定义节点开发 + +要开发自定义工作流节点,需要: + +1. **创建节点类**:继承 `AbstractWfNode` +2. **实现处理逻辑**:重写 `onProcess()` 方法 +3. **定义配置类**:创建节点配置类 +4. **注册组件**:在组件表中注册新组件 + +```java +public class CustomNode extends AbstractWfNode { + @Override + protected NodeProcessResult onProcess() { + // 实现自定义处理逻辑 + List outputs = new ArrayList<>(); + // ... 处理逻辑 + return NodeProcessResult.success(outputs); + } +} +``` + +### 2. 自定义组件注册 + +```sql +-- 在 t_workflow_component 表中添加新组件 +INSERT INTO t_workflow_component (uuid, name, title, remark, is_enable) +VALUES (REPLACE(UUID(), '-', ''), 'CustomNode', '自定义节点', '自定义节点描述', true); +``` diff --git a/ruoyi-modules-api/ruoyi-workflow-api/pom.xml b/ruoyi-modules-api/ruoyi-workflow-api/pom.xml index 7b1cafac..9049508f 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/pom.xml +++ b/ruoyi-modules-api/ruoyi-workflow-api/pom.xml @@ -43,6 +43,11 @@ ruoyi-system-api + + org.ruoyi + ruoyi-common-satoken + + org.ruoyi ruoyi-common-mail diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/base/ThreadContext.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/base/ThreadContext.java index 5b841d9a..cb7e992a 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/base/ThreadContext.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/base/ThreadContext.java @@ -1,67 +1,122 @@ package org.ruoyi.workflow.base; -import io.micrometer.common.util.StringUtils; +import cn.dev33.satoken.stp.StpUtil; +import org.apache.commons.lang3.StringUtils; +import org.ruoyi.common.core.domain.model.LoginUser; +import org.ruoyi.common.core.exception.base.BaseException; +import org.ruoyi.common.satoken.utils.LoginHelper; import org.ruoyi.workflow.entity.User; import org.ruoyi.workflow.enums.UserStatusEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import static org.ruoyi.workflow.enums.ErrorEnum.A_USER_NOT_FOUND; +/** + * 线程上下文适配器,统一接入 Sa-Token 登录态。 + */ public class ThreadContext { - private static final ThreadLocal currentUser = new ThreadLocal<>(); - private static final ThreadLocal currentToken = new ThreadLocal<>(); + + private static final ThreadLocal CURRENT_USER = new ThreadLocal<>(); + private static final ThreadLocal CURRENT_TOKEN = new ThreadLocal<>(); private ThreadContext() { } + /** + * 获取当前登录的工作流用户。 + */ public static User getCurrentUser() { - User user = new User(); - user.setName("admin"); - user.setEmail("12345@qq.com"); - user.setUuid("123456789"); - user.setUnderstandContextMsgPairNum(1); - user.setQuotaByTokenDaily(1); - user.setQuotaByTokenMonthly(1); - user.setQuotaByRequestDaily(1); - user.setQuotaByRequestMonthly(1); - user.setQuotaByImageDaily(1); - user.setQuotaByImageMonthly(1); - user.setUserStatus(UserStatusEnum.NORMAL); - user.setIsAdmin(true); - user.setId(1L); - return user; + User cached = CURRENT_USER.get(); + if (cached != null) { + return cached; + } + LoginUser loginUser = LoginHelper.getLoginUser(); + if (loginUser == null) { + throw new BaseException(A_USER_NOT_FOUND.getInfo()); + } + User mapped = mapToWorkflowUser(loginUser); + CURRENT_USER.set(mapped); + return mapped; } + /** + * 允许在测试或特殊场景下显式设置当前用户。 + */ public static void setCurrentUser(User user) { - currentUser.set(user); + if (user == null) { + CURRENT_USER.remove(); + } else { + CURRENT_USER.set(user); + } } + /** + * 获取当前登录用户 ID。 + */ public static Long getCurrentUserId() { - return 1L; + Long userId = LoginHelper.getUserId(); + if (userId != null) { + return userId; + } + return getCurrentUser().getId(); } + /** + * 获取当前访问 token。 + */ public static String getToken() { - return currentToken.get(); + String token = CURRENT_TOKEN.get(); + if (StringUtils.isNotBlank(token)) { + return token; + } + try { + token = StpUtil.getTokenValue(); + } catch (Exception ignore) { + token = null; + } + if (StringUtils.isNotBlank(token)) { + CURRENT_TOKEN.set(token); + } + return token; } public static void setToken(String token) { - currentToken.set(token); + if (StringUtils.isBlank(token)) { + CURRENT_TOKEN.remove(); + } else { + CURRENT_TOKEN.set(token); + } } public static boolean isLogin() { - return StringUtils.isNotBlank(currentToken.get()); + return LoginHelper.isLogin(); } public static User getExistCurrentUser() { - User user = ThreadContext.getCurrentUser(); - if (null == user) { - throw new WorkflowBaseException(A_USER_NOT_FOUND); - } - return user; + return getCurrentUser(); } - public void unload() { - currentUser.remove(); - currentToken.remove(); + public static void unload() { + CURRENT_USER.remove(); + CURRENT_TOKEN.remove(); + } + + private static User mapToWorkflowUser(LoginUser loginUser) { + User user = new User(); + user.setId(loginUser.getUserId()); + String nickname = loginUser.getNickName(); + user.setName(StringUtils.defaultIfBlank(nickname, loginUser.getUsername())); + user.setEmail(loginUser.getUsername()); + user.setUuid(String.valueOf(loginUser.getUserId())); + user.setUserStatus(UserStatusEnum.NORMAL); + user.setIsAdmin(LoginHelper.isSuperAdmin(loginUser.getUserId())); + user.setUnderstandContextMsgPairNum(0); + user.setQuotaByTokenDaily(0); + user.setQuotaByTokenMonthly(0); + user.setQuotaByRequestDaily(0); + user.setQuotaByRequestMonthly(0); + user.setQuotaByImageDaily(0); + user.setQuotaByImageMonthly(0); + user.setIsDeleted(false); + return user; } } diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/exception/WorkflowBaseException.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/exception/WorkflowBaseException.java deleted file mode 100644 index 198539ee..00000000 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/exception/WorkflowBaseException.java +++ /dev/null @@ -1,49 +0,0 @@ -package org.ruoyi.workflow.exception; - - -import org.ruoyi.workflow.enums.ErrorEnum; - -import java.text.MessageFormat; - -public class WorkflowBaseException extends RuntimeException { - private final String code; - private final String info; - - private Object data; - - public WorkflowBaseException(String code, String info) { - super(code + ":" + info); - this.code = code; - this.info = info; - } - - public WorkflowBaseException(ErrorEnum errorEnum, String... infoValues) { - super(errorEnum.getCode() + ":" + MessageFormat.format(errorEnum.getInfo(), infoValues)); - this.code = errorEnum.getCode(); - if (infoValues.length > 0) { - this.info = MessageFormat.format(errorEnum.getInfo(), infoValues); - } else { - this.info = errorEnum.getInfo(); - } - } - - public String getCode() { - return code; - } - - public String getInfo() { - return info; - } - - public Object getData() { - if (null != data) { - return data; - } - return getMessage(); - } - - public WorkflowBaseException setData(Object data) { - this.data = data; - return this; - } -} diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowComponentService.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowComponentService.java index 18afff42..04e3b03c 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowComponentService.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowComponentService.java @@ -7,11 +7,11 @@ import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.dto.workflow.WfComponentReq; import org.ruoyi.workflow.dto.workflow.WfComponentSearchReq; import org.ruoyi.workflow.entity.WorkflowComponent; import org.ruoyi.workflow.enums.ErrorEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.mapper.WorkflowComponentMapper; import org.ruoyi.workflow.util.PrivilegeUtil; import org.ruoyi.workflow.util.UuidUtil; @@ -74,11 +74,18 @@ public class WorkflowComponentService extends ServiceImpl 0) { - throw new WorkflowBaseException(C_WF_COMPONENT_DELETED_FAIL_BY_USED); - } else { -// PrivilegeUtil.checkAndDelete(uuid, this.query(), ChainWrappers.updateChain(baseMapper), ErrorEnum.A_WF_COMPONENT_NOT_FOUND); + if (refNodeCount != null && refNodeCount > 0) { + throw new BaseException(C_WF_COMPONENT_DELETED_FAIL_BY_USED.getInfo()); + } + boolean updated = ChainWrappers.lambdaUpdateChain(baseMapper) + .eq(WorkflowComponent::getId, component.getId()) + .set(WorkflowComponent::getIsDeleted, true) + .set(WorkflowComponent::getIsEnable, false) + .update(); + if (!updated) { + throw new BaseException(ErrorEnum.A_WF_COMPONENT_NOT_FOUND.getInfo()); } } @@ -106,7 +113,7 @@ public class WorkflowComponentService extends ServiceImpl WfComponentNameEnum.START.getName().equals(component.getName())) .findFirst() - .orElseThrow(() -> new WorkflowBaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND)); + .orElseThrow(() -> new BaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND.getInfo())); } public WorkflowComponent getComponent(Long id) { @@ -114,6 +121,6 @@ public class WorkflowComponentService extends ServiceImpl component.getId().equals(id)) .findFirst() - .orElseThrow(() -> new WorkflowBaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND)); + .orElseThrow(() -> new BaseException(ErrorEnum.B_WF_NODE_DEFINITION_NOT_FOUND.getInfo())); } } diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowEdgeService.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowEdgeService.java index e9bc873a..ba545e2d 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowEdgeService.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/service/WorkflowEdgeService.java @@ -6,10 +6,10 @@ import com.baomidou.mybatisplus.extension.toolkit.ChainWrappers; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.dto.workflow.WfEdgeReq; import org.ruoyi.workflow.entity.WorkflowEdge; import org.ruoyi.workflow.enums.ErrorEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.mapper.WorkflowEdgeMapper; import org.ruoyi.workflow.util.MPPageUtil; import org.ruoyi.workflow.util.UuidUtil; @@ -98,7 +98,7 @@ public class WorkflowEdgeService extends ServiceImpl { public WorkflowResp updateBaseInfo(String wfUuid, String title, String remark, Boolean isPublic) { if (StringUtils.isAnyBlank(wfUuid, title)) { - throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR); + throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo()); } ChainWrappers.lambdaUpdateChain(baseMapper) .eq(Workflow::getUuid, wfUuid) @@ -108,7 +108,7 @@ public class WorkflowService extends ServiceImpl { .last("limit 1") .one(); if (null == workflow) { - throw new WorkflowBaseException(ErrorEnum.A_WF_NOT_FOUND); + throw new BaseException(ErrorEnum.A_WF_NOT_FOUND.getInfo()); } return workflow; } @@ -160,7 +160,7 @@ public class WorkflowService extends ServiceImpl { public void enable(String uuid, Boolean enable) { if (null == enable) { - throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR); + throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo()); } Workflow workflow = PrivilegeUtil.checkAndGetByUuid(uuid, this.query(), ErrorEnum.A_WF_NOT_FOUND); ChainWrappers.lambdaUpdateChain(baseMapper) diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/JsonUtil.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/JsonUtil.java index 48598b13..975d67bc 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/JsonUtil.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/JsonUtil.java @@ -1,7 +1,6 @@ package org.ruoyi.workflow.util; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -17,7 +16,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/PrivilegeUtil.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/PrivilegeUtil.java index 548ff031..7ad0dc7a 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/PrivilegeUtil.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/util/PrivilegeUtil.java @@ -1,9 +1,9 @@ package org.ruoyi.workflow.util; import com.baomidou.mybatisplus.extension.conditions.query.QueryChainWrapper; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.base.ThreadContext; import org.ruoyi.workflow.enums.ErrorEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import static org.ruoyi.workflow.cosntant.AdiConstant.*; @@ -24,7 +24,7 @@ public class PrivilegeUtil { target = lambdaQueryChainWrapper.eq(null != id, COLUMN_NAME_ID, id).eq(null != uuid, COLUMN_NAME_UUID, uuid).eq(COLUMN_NAME_USER_ID, ThreadContext.getCurrentUserId()).eq(COLUMN_NAME_IS_DELETE, false).oneOpt().orElse(null); } if (null == target) { - throw new WorkflowBaseException(exceptionMessage); + throw new BaseException(exceptionMessage.getInfo()); } return target; } diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WfNodeIODataUtil.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WfNodeIODataUtil.java index 5c93d9dc..61828568 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WfNodeIODataUtil.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WfNodeIODataUtil.java @@ -1,13 +1,12 @@ package org.ruoyi.workflow.workflow; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ObjectUtil; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.commons.collections4.CollectionUtils; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.enums.ErrorEnum; import org.ruoyi.workflow.enums.WfIODataTypeEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.util.JsonUtil; import org.ruoyi.workflow.workflow.data.NodeIOData; import org.ruoyi.workflow.workflow.data.NodeIODataFilesContent; @@ -36,7 +35,7 @@ public class WfNodeIODataUtil { JsonNode nameObj = data.get("name"); JsonNode content = data.get("content"); if (null == nameObj || null == content) { - throw new WorkflowBaseException(ErrorEnum.A_PARAMS_ERROR); + throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo()); } String name = nameObj.asText(); Integer type = content.get("type").asInt(); diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowEngine.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowEngine.java index 8e2cd4d8..55d10a17 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowEngine.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowEngine.java @@ -10,16 +10,15 @@ import org.bsc.async.AsyncGenerator; import org.bsc.langgraph4j.*; import org.bsc.langgraph4j.checkpoint.MemorySaver; import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator; -import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer; import org.bsc.langgraph4j.state.AgentState; import org.bsc.langgraph4j.state.StateSnapshot; import org.bsc.langgraph4j.streaming.StreamingOutput; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.base.NodeInputConfigTypeHandler; import org.ruoyi.workflow.dto.workflow.WfRuntimeNodeDto; import org.ruoyi.workflow.dto.workflow.WfRuntimeResp; import org.ruoyi.workflow.entity.*; import org.ruoyi.workflow.enums.ErrorEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.helper.SSEEmitterHelper; import org.ruoyi.workflow.service.WorkflowRuntimeNodeService; import org.ruoyi.workflow.service.WorkflowRuntimeService; @@ -34,12 +33,8 @@ import java.util.*; import java.util.function.Function; import static org.bsc.langgraph4j.StateGraph.END; -import static org.bsc.langgraph4j.StateGraph.START; -import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async; -import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async; import static org.ruoyi.workflow.cosntant.AdiConstant.WorkflowConstant.*; import static org.ruoyi.workflow.enums.ErrorEnum.*; -import static org.ruoyi.workflow.workflow.WfComponentNameEnum.HUMAN_FEEDBACK; @Slf4j public class WorkflowEngine { @@ -50,11 +45,6 @@ public class WorkflowEngine { private final SSEEmitterHelper sseEmitterHelper; private final WorkflowRuntimeService workflowRuntimeService; private final WorkflowRuntimeNodeService workflowRuntimeNodeService; - private final ObjectStreamStateSerializer stateSerializer = new ObjectStreamStateSerializer<>(WfNodeState::new); - private final Map>> stateGraphNodes = new HashMap<>(); - private final Map>> stateGraphEdges = new HashMap<>(); - private final Map rootToSubGraph = new HashMap<>(); - private final Map nodeToParallelBranch = new HashMap<>(); private CompiledGraph app; private SseEmitter sseEmitter; private User user; @@ -84,7 +74,7 @@ public class WorkflowEngine { log.info("WorkflowEngine run,userId:{},workflowUuid:{},userInputs:{}", user.getId(), workflow.getUuid(), userInputs); if (!this.workflow.getIsEnable()) { sseEmitterHelper.sendErrorAndComplete(user.getId(), sseEmitter, ErrorEnum.A_WF_DISABLED.getInfo()); - throw new WorkflowBaseException(ErrorEnum.A_WF_DISABLED); + throw new BaseException(ErrorEnum.A_WF_DISABLED.getInfo()); } Long workflowId = this.workflow.getId(); @@ -96,20 +86,17 @@ public class WorkflowEngine { Pair> startAndEnds = findStartAndEndNode(); WorkflowNode startNode = startAndEnds.getLeft(); List wfInputs = getAndCheckUserInput(userInputs, startNode); - //工作流运行实例状态 this.wfState = new WfState(user, wfInputs, runtimeUuid); workflowRuntimeService.updateInput(this.wfRuntimeResp.getId(), wfState); - CompileNode rootCompileNode = new CompileNode(); - rootCompileNode.setId(startNode.getUuid()); - //构建整棵树 - buildCompileNode(rootCompileNode, startNode); - //主状态图 - StateGraph mainStateGraph = new StateGraph<>(stateSerializer); - this.wfState.addEdge(START, startNode.getUuid()); - //构建包括所有节点的状态图 - buildStateGraph(null, mainStateGraph, rootCompileNode); + WorkflowGraphBuilder graphBuilder = new WorkflowGraphBuilder( + components, + wfNodes, + wfEdges, + this::runNode, + this.wfState); + StateGraph mainStateGraph = graphBuilder.build(startNode); MemorySaver saver = new MemorySaver(); CompileConfig compileConfig = CompileConfig.builder().checkpointSaver(saver) @@ -130,13 +117,13 @@ public class WorkflowEngine { StateSnapshot stateSnapshot = app.getState(invokeConfig); String nextNode = stateSnapshot.config().nextNode().orElse(""); - //还有下个节点,表示进入中断状态,等待用户输入后继续执行 + //还有下个节点,表示进入中断状态,等待用户输入后继续执�? if (StringUtils.isNotBlank(nextNode) && !nextNode.equalsIgnoreCase(END)) { String intTip = WorkflowUtil.getHumanFeedbackTip(nextNode, wfNodes); //将等待输入信息[事件与提示词]发送到到客户端 SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_WAIT_FEEDBACK_BY_" + nextNode + "]", intTip); InterruptedFlow.RUNTIME_TO_GRAPH.put(wfState.getUuid(), this); - //更新状态 + //更新状�? wfState.setProcessStatus(WORKFLOW_PROCESS_STATUS_WAITING_INPUT); workflowRuntimeService.updateOutput(wfRuntimeResp.getId(), wfState); } else { @@ -170,7 +157,7 @@ public class WorkflowEngine { log.error("error", e); String errorMsg = e.getMessage(); if (errorMsg.contains("parallel node doesn't support conditional branch")) { - errorMsg = "并行节点中不能包含条件分支"; + errorMsg = "并行节点中不能包含条件分�?"; } sseEmitterHelper.sendErrorAndComplete(user.getId(), sseEmitter, errorMsg); workflowRuntimeService.updateStatus(wfRuntimeResp.getId(), WORKFLOW_PROCESS_STATUS_FAIL, errorMsg); @@ -214,7 +201,7 @@ public class WorkflowEngine { } }, (is) -> { workflowRuntimeNodeService.updateOutput(runtimeNodeDto.getId(), nodeState); - //并行节点内部的节点执行结束后,需要主动向客户端发送输出结果 + //并行节点内部的节点执行结束后,需要主动向客户端发送输出结�? String nodeUuid = wfNode.getUuid(); List nodeOutputs = nodeState.getOutputs(); for (NodeIOData output : nodeOutputs) { @@ -227,10 +214,10 @@ public class WorkflowEngine { } } catch (Exception e) { log.error("Node run error", e); - throw new WorkflowBaseException(ErrorEnum.B_WF_RUN_ERROR); + throw new BaseException(ErrorEnum.B_WF_RUN_ERROR.getInfo()); } resultMap.put("name", wfNode.getTitle()); - //langgraph4j state中的data不做数据存储,只存储元数据 + //langgraph4j state中的data不做数据存储,只存储元数�? StreamingChatGenerator generator = wfState.getNodeToStreamingGenerator().get(wfNode.getUuid()); if (null != generator) { resultMap.put("_streaming_messages", generator); @@ -251,6 +238,9 @@ public class WorkflowEngine { String node = streamingOutput.node(); String chunk = streamingOutput.chunk(); log.info("node:{},chunk:{}", node, chunk); + Map strMap = new HashMap<>(); + strMap.put("ck", chunk); +// SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_CHUNK_" + node + "]", strMap.toString()); SSEEmitterHelper.parseAndSendPartialMsg(sseEmitter, "[NODE_CHUNK_" + node + "]", chunk); } else { AbstractWfNode abstractWfNode = wfState.getCompletedNodes().stream() @@ -274,8 +264,8 @@ public class WorkflowEngine { * 校验用户输入并组装成工作流的输入 * * @param userInputs 用户输入 - * @param startNode 开始节点定义 - * @return 正确的用户输入列表 + * @param startNode 开始节点定�? + * @return 正确的用户输入列�? */ private List getAndCheckUserInput(List userInputs, WorkflowNode startNode) { WfNodeInputConfig wfNodeInputConfig = NodeInputConfigTypeHandler.fillNodeInputConfig(startNode.getInputConfig()); @@ -292,19 +282,19 @@ public class WorkflowEngine { } Integer dataType = nodeIOData.getContent().getType(); if (null == dataType) { - throw new WorkflowBaseException(A_WF_INPUT_INVALID); + throw new BaseException(A_WF_INPUT_INVALID.getInfo()); } requiredParamMissing = false; boolean valid = paramDefinition.checkValue(nodeIOData); if (!valid) { log.error("用户输入无效,workflowId:{}", startNode.getWorkflowId()); - throw new WorkflowBaseException(ErrorEnum.A_WF_INPUT_INVALID); + throw new BaseException(ErrorEnum.A_WF_INPUT_INVALID.getInfo()); } wfInputs.add(nodeIOData); } if (requiredParamMissing) { log.error("在流程定义中必填的参数没有传进来,name:{}", paramNameFromDef); - throw new WorkflowBaseException(A_WF_INPUT_MISSING); + throw new BaseException(A_WF_INPUT_MISSING.getInfo()); } } return wfInputs; @@ -323,7 +313,7 @@ public class WorkflowEngine { Optional wfComponent = components.stream().filter(item -> item.getId().equals(node.getWorkflowComponentId())).findFirst(); if (wfComponent.isPresent() && WfComponentNameEnum.START.getName().equals(wfComponent.get().getName())) { if (null != startNode) { - throw new WorkflowBaseException(ErrorEnum.A_WF_MULTIPLE_START_NODE); + throw new BaseException(ErrorEnum.A_WF_MULTIPLE_START_NODE.getInfo()); } startNode = node; } else if (wfComponent.isPresent() && WfComponentNameEnum.END.getName().equals(wfComponent.get().getName())) { @@ -331,8 +321,8 @@ public class WorkflowEngine { } } if (null == startNode) { - log.error("没有开始节点,workflowId:{}", wfNodes.get(0).getWorkflowId()); - throw new WorkflowBaseException(ErrorEnum.A_WF_START_NODE_NOT_FOUND); + log.error("没有开始节点, workflowId:{}", wfNodes.get(0).getWorkflowId()); + throw new BaseException(ErrorEnum.A_WF_START_NODE_NOT_FOUND.getInfo()); } //Find all end nodes wfNodes.forEach(item -> { @@ -354,217 +344,11 @@ public class WorkflowEngine { log.info("end nodes:{}", endNodes); if (endNodes.isEmpty()) { log.error("没有结束节点,workflowId:{}", startNode.getWorkflowId()); - throw new WorkflowBaseException(A_WF_END_NODE_NOT_FOUND); + throw new BaseException(A_WF_END_NODE_NOT_FOUND.getInfo()); } return Pair.of(startNode, endNodes); } - private void buildCompileNode( - CompileNode parentNode, - WorkflowNode node) { - log.info("buildByNode, parentNode:{}, node:{},title:{}", parentNode.getId(), node.getUuid(), node.getTitle()); - CompileNode newNode; - List upstreamNodeUuids = getUpstreamNodeUuids(node.getUuid()); - if (upstreamNodeUuids.isEmpty()) { - log.error("节点{}没有上游节点", node.getUuid()); - newNode = parentNode; - } else if (upstreamNodeUuids.size() == 1) { - String upstreamUuid = upstreamNodeUuids.get(0); - boolean pointToParallel = pointToParallelBranch(upstreamUuid); - if (pointToParallel) { - String rootId = node.getUuid(); - GraphCompileNode graphCompileNode = getOrCreateGraphCompileNode(rootId); - appendToNextNodes(parentNode, graphCompileNode); - newNode = graphCompileNode; - } else if (parentNode instanceof GraphCompileNode graphCompileNode) { - newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); - graphCompileNode.appendToLeaf(newNode); - } else { - newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); - appendToNextNodes(parentNode, newNode); - } - } else { - newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); - GraphCompileNode parallelBranch = nodeToParallelBranch.get(parentNode.getId()); - appendToNextNodes(Objects.requireNonNullElse(parallelBranch, parentNode), newNode); - } - - if (null == newNode) { - log.error("节点{}不存在", node.getUuid()); - return; - } - List downstreamUuids = getDownstreamNodeUuids(node.getUuid()); - for (String downstream : downstreamUuids) { - Optional n = wfNodes.stream().filter(item -> item.getUuid().equals(downstream)).findFirst(); - n.ifPresent(workflowNode -> buildCompileNode(newNode, workflowNode)); - } - } - - /** - * 构建完整的stategraph - * - * @param upstreamCompileNode 上游节点 - * @param stateGraph 当前状态图 - * @param compileNode 当前节点 - * @throws GraphStateException 状态图异常 - */ - private void buildStateGraph(CompileNode upstreamCompileNode, StateGraph stateGraph, CompileNode compileNode) throws GraphStateException { - log.info("buildStateGraph,upstreamCompileNode:{},node:{}", upstreamCompileNode, compileNode.getId()); - String stateGraphNodeUuid = compileNode.getId(); - if (null == upstreamCompileNode) { - addNodeToStateGraph(stateGraph, stateGraphNodeUuid); - addEdgeToStateGraph(stateGraph, START, compileNode.getId()); - } else { - if (compileNode instanceof GraphCompileNode graphCompileNode) { - String stateGraphId = graphCompileNode.getId(); - CompileNode root = graphCompileNode.getRoot(); - String rootId = root.getId(); - String existSubGraphId = rootToSubGraph.get(rootId); - - if (StringUtils.isBlank(existSubGraphId)) { - StateGraph subgraph = new StateGraph<>(stateSerializer); - addNodeToStateGraph(subgraph, rootId); - addEdgeToStateGraph(subgraph, START, rootId); - for (CompileNode child : root.getNextNodes()) { - buildStateGraph(root, subgraph, child); - } - addEdgeToStateGraph(subgraph, graphCompileNode.getTail().getId(), END); - stateGraph.addNode(stateGraphId, subgraph.compile()); - rootToSubGraph.put(rootId, stateGraphId); - - stateGraphNodeUuid = stateGraphId; - } else { - stateGraphNodeUuid = existSubGraphId; - } - } else { - addNodeToStateGraph(stateGraph, stateGraphNodeUuid); - } - - //ConditionalEdge 的创建另外处理 - if (Boolean.FALSE.equals(upstreamCompileNode.getConditional())) { - addEdgeToStateGraph(stateGraph, upstreamCompileNode.getId(), stateGraphNodeUuid); - } - } - List nextNodes = compileNode.getNextNodes(); - if (nextNodes.size() > 1) { - boolean conditional = nextNodes.stream().noneMatch(item -> item instanceof GraphCompileNode); - compileNode.setConditional(conditional); - for (CompileNode nextNode : nextNodes) { - buildStateGraph(compileNode, stateGraph, nextNode); - } - //节点是"条件分支"或"分类"的情况下不支持并行执行,所以直接使用条件ConditionalEdge - if (conditional) { - List targets = nextNodes.stream().map(CompileNode::getId).toList(); - Map mappings = new HashMap<>(); - for (String target : targets) { - mappings.put(target, target); - } - stateGraph.addConditionalEdges( - stateGraphNodeUuid, - edge_async(state -> state.data().get("next").toString()), - mappings - ); - } - } else if (nextNodes.size() == 1) { - for (CompileNode nextNode : nextNodes) { - buildStateGraph(compileNode, stateGraph, nextNode); - } - } else { - addEdgeToStateGraph(stateGraph, stateGraphNodeUuid, END); - } - } - - private GraphCompileNode getOrCreateGraphCompileNode(String rootId) { - GraphCompileNode exist = nodeToParallelBranch.get(rootId); - if (null == exist) { - GraphCompileNode graphCompileNode = new GraphCompileNode(); - graphCompileNode.setId("parallel_" + rootId); - graphCompileNode.setRoot(CompileNode.builder().id(rootId).conditional(false).nextNodes(new ArrayList<>()).build()); - nodeToParallelBranch.put(rootId, graphCompileNode); - exist = graphCompileNode; - } - return exist; - - } - - private List getUpstreamNodeUuids(String nodeUuid) { - return this.wfEdges.stream() - .filter(edge -> edge.getTargetNodeUuid().equals(nodeUuid)) - .map(WorkflowEdge::getSourceNodeUuid) - .toList(); - } - - private List getDownstreamNodeUuids(String nodeUuid) { - return this.wfEdges.stream() - .filter(edge -> edge.getSourceNodeUuid().equals(nodeUuid)) - .map(WorkflowEdge::getTargetNodeUuid) - .toList(); - } - - //判断节点是否属于子图 - private boolean pointToParallelBranch(String nodeUuid) { - int edgeCount = 0; - for (WorkflowEdge edge : this.wfEdges) { - if (edge.getSourceNodeUuid().equals(nodeUuid) && StringUtils.isBlank(edge.getSourceHandle())) { - edgeCount = edgeCount + 1; - } - } - return edgeCount > 1; - } - - /** - * 添加节点到状态图 - * - * @param stateGraph - * @param stateGraphNodeUuid - * @throws GraphStateException - */ - private void addNodeToStateGraph(StateGraph stateGraph, String stateGraphNodeUuid) throws GraphStateException { - List> stateGraphList = stateGraphNodes.computeIfAbsent(stateGraphNodeUuid, k -> new ArrayList<>()); - boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph); - if (exist) { - log.info("state graph node exist,stateGraphNodeUuid:{}", stateGraphNodeUuid); - return; - } - log.info("addNodeToStateGraph,node uuid:{}", stateGraphNodeUuid); - WorkflowNode wfNode = getNodeByUuid(stateGraphNodeUuid); - stateGraph.addNode(stateGraphNodeUuid, node_async((state) -> runNode(wfNode, state))); - stateGraphList.add(stateGraph); - - //记录人机交互节点 - WorkflowComponent wfComponent = components.stream().filter(item -> item.getId().equals(wfNode.getWorkflowComponentId())).findFirst().orElseThrow(); - if (HUMAN_FEEDBACK.getName().equals(wfComponent.getName())) { - this.wfState.addInterruptNode(stateGraphNodeUuid); - } - } - - private void addEdgeToStateGraph(StateGraph stateGraph, String source, String target) throws GraphStateException { - String key = source + "_" + target; - List> stateGraphList = stateGraphEdges.computeIfAbsent(key, k -> new ArrayList<>()); - boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph); - if (exist) { - log.info("state graph edge exist,source:{},target:{}", source, target); - return; - } - log.info("addEdgeToStateGraph,source:{},target:{}", source, target); - stateGraph.addEdge(source, target); - stateGraphList.add(stateGraph); - } - - private WorkflowNode getNodeByUuid(String nodeUuid) { - return wfNodes.stream() - .filter(item -> item.getUuid().equals(nodeUuid)) - .findFirst() - .orElseThrow(() -> new WorkflowBaseException(ErrorEnum.A_WF_NODE_NOT_FOUND)); - } - - private void appendToNextNodes(CompileNode compileNode, CompileNode newNode) { - boolean exist = compileNode.getNextNodes().stream().anyMatch(item -> item.getId().equals(newNode.getId())); - if (!exist) { - compileNode.getNextNodes().add(newNode); - } - - } public CompiledGraph getApp() { return app; diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowGraphBuilder.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowGraphBuilder.java new file mode 100644 index 00000000..aa11b4b5 --- /dev/null +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowGraphBuilder.java @@ -0,0 +1,257 @@ +package org.ruoyi.workflow.workflow; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.bsc.langgraph4j.GraphStateException; +import org.bsc.langgraph4j.StateGraph; +import org.bsc.langgraph4j.serializer.std.ObjectStreamStateSerializer; +import org.ruoyi.common.core.exception.base.BaseException; +import org.ruoyi.workflow.entity.WorkflowComponent; +import org.ruoyi.workflow.entity.WorkflowEdge; +import org.ruoyi.workflow.entity.WorkflowNode; +import org.ruoyi.workflow.enums.ErrorEnum; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.bsc.langgraph4j.StateGraph.END; +import static org.bsc.langgraph4j.StateGraph.START; +import static org.bsc.langgraph4j.action.AsyncEdgeAction.edge_async; +import static org.bsc.langgraph4j.action.AsyncNodeAction.node_async; +import static org.ruoyi.workflow.workflow.WfComponentNameEnum.HUMAN_FEEDBACK; + +/** + * 负责构建工作流运行所依赖的状态图�? + */ +@Slf4j +public class WorkflowGraphBuilder { + + private final Map componentIndex; + private final Map nodeIndex; + private final Map> edgesBySource; + private final Map> edgesByTarget; + private final WorkflowNodeRunner nodeRunner; + private final WfState wfState; + + private final ObjectStreamStateSerializer stateSerializer = new ObjectStreamStateSerializer<>(WfNodeState::new); + private final Map>> stateGraphNodes = new HashMap<>(); + private final Map>> stateGraphEdges = new HashMap<>(); + private final Map rootToSubGraph = new HashMap<>(); + private final Map nodeToParallelBranch = new HashMap<>(); + + public WorkflowGraphBuilder( + List components, + List nodes, + List edges, + WorkflowNodeRunner nodeRunner, + WfState wfState) { + this.componentIndex = components.stream() + .collect(Collectors.toMap(WorkflowComponent::getId, Function.identity(), (origin, ignore) -> origin)); + this.nodeIndex = nodes.stream() + .collect(Collectors.toMap(WorkflowNode::getUuid, Function.identity(), (origin, ignore) -> origin)); + this.edgesBySource = edges.stream().collect(Collectors.groupingBy(WorkflowEdge::getSourceNodeUuid)); + this.edgesByTarget = edges.stream().collect(Collectors.groupingBy(WorkflowEdge::getTargetNodeUuid)); + this.nodeRunner = nodeRunner; + this.wfState = wfState; + } + + public StateGraph build(WorkflowNode startNode) throws GraphStateException { + CompileNode rootCompileNode = new CompileNode(); + rootCompileNode.setId(startNode.getUuid()); + buildCompileNode(rootCompileNode, startNode); + + StateGraph mainStateGraph = new StateGraph<>(stateSerializer); + wfState.addEdge(START, startNode.getUuid()); + buildStateGraph(null, mainStateGraph, rootCompileNode); + return mainStateGraph; + } + + private void buildCompileNode(CompileNode parentNode, WorkflowNode node) { + log.info("buildCompileNode, parentNode:{}, node:{}, title:{}", parentNode.getId(), node.getUuid(), node.getTitle()); + CompileNode newNode; + List upstreamNodeUuids = getUpstreamNodeUuids(node.getUuid()); + if (upstreamNodeUuids.isEmpty()) { + log.error("节点{}没有上游节点", node.getUuid()); + newNode = parentNode; + } else if (upstreamNodeUuids.size() == 1) { + String upstreamUuid = upstreamNodeUuids.get(0); + boolean pointToParallel = pointToParallelBranch(upstreamUuid); + if (pointToParallel) { + String rootId = node.getUuid(); + GraphCompileNode graphCompileNode = getOrCreateGraphCompileNode(rootId); + appendToNextNodes(parentNode, graphCompileNode); + newNode = graphCompileNode; + } else if (parentNode instanceof GraphCompileNode graphCompileNode) { + newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); + graphCompileNode.appendToLeaf(newNode); + } else { + newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); + appendToNextNodes(parentNode, newNode); + } + } else { + newNode = CompileNode.builder().id(node.getUuid()).conditional(false).nextNodes(new ArrayList<>()).build(); + GraphCompileNode parallelBranch = nodeToParallelBranch.get(parentNode.getId()); + appendToNextNodes(Objects.requireNonNullElse(parallelBranch, parentNode), newNode); + } + + if (newNode == null) { + log.error("节点:{}不存�?", node.getUuid()); + return; + } + for (String downstream : getDownstreamNodeUuids(node.getUuid())) { + WorkflowNode downstreamNode = nodeIndex.get(downstream); + if (downstreamNode != null) { + buildCompileNode(newNode, downstreamNode); + } + } + } + + private void buildStateGraph(CompileNode upstreamCompileNode, + StateGraph stateGraph, + CompileNode compileNode) throws GraphStateException { + log.info("buildStateGraph, upstream:{}, node:{}", upstreamCompileNode, compileNode.getId()); + String stateGraphNodeUuid = compileNode.getId(); + if (upstreamCompileNode == null) { + addNodeToStateGraph(stateGraph, stateGraphNodeUuid); + addEdgeToStateGraph(stateGraph, START, compileNode.getId()); + } else { + if (compileNode instanceof GraphCompileNode graphCompileNode) { + String stateGraphId = graphCompileNode.getId(); + CompileNode root = graphCompileNode.getRoot(); + String rootId = root.getId(); + String existSubGraphId = rootToSubGraph.get(rootId); + + if (StringUtils.isBlank(existSubGraphId)) { + StateGraph subgraph = new StateGraph<>(stateSerializer); + addNodeToStateGraph(subgraph, rootId); + addEdgeToStateGraph(subgraph, START, rootId); + for (CompileNode child : root.getNextNodes()) { + buildStateGraph(root, subgraph, child); + } + addEdgeToStateGraph(subgraph, graphCompileNode.getTail().getId(), END); + stateGraph.addNode(stateGraphId, subgraph.compile()); + rootToSubGraph.put(rootId, stateGraphId); + stateGraphNodeUuid = stateGraphId; + } else { + stateGraphNodeUuid = existSubGraphId; + } + } else { + addNodeToStateGraph(stateGraph, stateGraphNodeUuid); + } + + if (Boolean.FALSE.equals(upstreamCompileNode.getConditional())) { + addEdgeToStateGraph(stateGraph, upstreamCompileNode.getId(), stateGraphNodeUuid); + } + } + + List nextNodes = compileNode.getNextNodes(); + if (nextNodes.size() > 1) { + boolean conditional = nextNodes.stream().noneMatch(item -> item instanceof GraphCompileNode); + compileNode.setConditional(conditional); + for (CompileNode nextNode : nextNodes) { + buildStateGraph(compileNode, stateGraph, nextNode); + } + if (conditional) { + List targets = nextNodes.stream().map(CompileNode::getId).toList(); + Map mappings = new HashMap<>(); + for (String target : targets) { + mappings.put(target, target); + } + stateGraph.addConditionalEdges( + stateGraphNodeUuid, + edge_async(state -> state.data().get("next").toString()), + mappings + ); + } + } else if (nextNodes.size() == 1) { + for (CompileNode nextNode : nextNodes) { + buildStateGraph(compileNode, stateGraph, nextNode); + } + } else { + addEdgeToStateGraph(stateGraph, stateGraphNodeUuid, END); + } + } + + private GraphCompileNode getOrCreateGraphCompileNode(String rootId) { + GraphCompileNode exist = nodeToParallelBranch.get(rootId); + if (exist == null) { + GraphCompileNode graphCompileNode = new GraphCompileNode(); + graphCompileNode.setId("parallel_" + rootId); + graphCompileNode.setRoot(CompileNode.builder().id(rootId).conditional(false).nextNodes(new ArrayList<>()).build()); + nodeToParallelBranch.put(rootId, graphCompileNode); + exist = graphCompileNode; + } + return exist; + } + + private List getUpstreamNodeUuids(String nodeUuid) { + return edgesByTarget.getOrDefault(nodeUuid, List.of()) + .stream() + .map(WorkflowEdge::getSourceNodeUuid) + .toList(); + } + + private List getDownstreamNodeUuids(String nodeUuid) { + return edgesBySource.getOrDefault(nodeUuid, List.of()) + .stream() + .map(WorkflowEdge::getTargetNodeUuid) + .toList(); + } + + private boolean pointToParallelBranch(String nodeUuid) { + return edgesBySource.getOrDefault(nodeUuid, List.of()) + .stream() + .filter(edge -> StringUtils.isBlank(edge.getSourceHandle())) + .count() > 1; + } + + private void addNodeToStateGraph(StateGraph stateGraph, String stateGraphNodeUuid) throws GraphStateException { + List> stateGraphList = stateGraphNodes.computeIfAbsent(stateGraphNodeUuid, k -> new ArrayList<>()); + boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph); + if (exist) { + log.info("state graph node exist,stateGraphNodeUuid:{}", stateGraphNodeUuid); + return; + } + log.info("addNodeToStateGraph,node uuid:{}", stateGraphNodeUuid); + WorkflowNode wfNode = getNodeByUuid(stateGraphNodeUuid); + stateGraph.addNode(stateGraphNodeUuid, node_async(state -> nodeRunner.run(wfNode, state))); + stateGraphList.add(stateGraph); + + WorkflowComponent component = componentIndex.get(wfNode.getWorkflowComponentId()); + if (component == null) { + throw new BaseException(ErrorEnum.A_PARAMS_ERROR.getInfo()); + } + if (HUMAN_FEEDBACK.getName().equals(component.getName())) { + wfState.addInterruptNode(stateGraphNodeUuid); + } + } + + private void addEdgeToStateGraph(StateGraph stateGraph, String source, String target) throws GraphStateException { + String key = source + "_" + target; + List> stateGraphList = stateGraphEdges.computeIfAbsent(key, k -> new ArrayList<>()); + boolean exist = stateGraphList.stream().anyMatch(item -> item == stateGraph); + if (exist) { + log.info("state graph edge exist,source:{},target:{}", source, target); + return; + } + log.info("addEdgeToStateGraph,source:{},target:{}", source, target); + stateGraph.addEdge(source, target); + stateGraphList.add(stateGraph); + } + + private WorkflowNode getNodeByUuid(String nodeUuid) { + WorkflowNode workflowNode = nodeIndex.get(nodeUuid); + if (workflowNode == null) { + throw new BaseException(ErrorEnum.A_WF_NODE_NOT_FOUND.getInfo()); + } + return workflowNode; + } + + private void appendToNextNodes(CompileNode compileNode, CompileNode newNode) { + boolean exist = compileNode.getNextNodes().stream().anyMatch(item -> item.getId().equals(newNode.getId())); + if (!exist) { + compileNode.getNextNodes().add(newNode); + } + } +} diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowNodeRunner.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowNodeRunner.java new file mode 100644 index 00000000..324f344a --- /dev/null +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowNodeRunner.java @@ -0,0 +1,14 @@ +package org.ruoyi.workflow.workflow; + +import org.ruoyi.workflow.entity.WorkflowNode; + +import java.util.Map; + +/** + * 回调接口,负责执行业务节点并返回下游编排所需的元数据。 + */ +@FunctionalInterface +public interface WorkflowNodeRunner { + + Map run(WorkflowNode node, WfNodeState nodeState); +} diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowStarter.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowStarter.java index 5f15cc89..69e036fb 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowStarter.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/WorkflowStarter.java @@ -3,8 +3,8 @@ package org.ruoyi.workflow.workflow; import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.entity.*; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.helper.SSEEmitterHelper; import org.ruoyi.workflow.service.*; import org.springframework.context.annotation.Lazy; @@ -87,7 +87,7 @@ public class WorkflowStarter { WorkflowEngine workflowEngine = InterruptedFlow.RUNTIME_TO_GRAPH.get(runtimeUuid); if (null == workflowEngine) { log.error("工作流恢复执行时失败,runtime:{}", runtimeUuid); - throw new WorkflowBaseException(A_WF_RESUME_FAIL); + throw new BaseException(A_WF_RESUME_FAIL.getInfo()); } workflowEngine.resume(userInput); } diff --git a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/node/AbstractWfNode.java b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/node/AbstractWfNode.java index 3974349d..d6119e5c 100644 --- a/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/node/AbstractWfNode.java +++ b/ruoyi-modules-api/ruoyi-workflow-api/src/main/java/org/ruoyi/workflow/workflow/node/AbstractWfNode.java @@ -1,6 +1,5 @@ package org.ruoyi.workflow.workflow.node; -import cn.hutool.core.collection.CollUtil; import com.fasterxml.jackson.databind.node.ObjectNode; import jakarta.validation.ConstraintViolation; import lombok.Data; @@ -8,11 +7,11 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.StringUtils; +import org.ruoyi.common.core.exception.base.BaseException; import org.ruoyi.workflow.base.NodeInputConfigTypeHandler; import org.ruoyi.workflow.entity.WorkflowComponent; import org.ruoyi.workflow.entity.WorkflowNode; import org.ruoyi.workflow.enums.WfIODataTypeEnum; -import org.ruoyi.workflow.exception.WorkflowBaseException; import org.ruoyi.workflow.util.JsonUtil; import org.ruoyi.workflow.util.SpringUtil; import org.ruoyi.workflow.workflow.NodeProcessResult; @@ -185,13 +184,13 @@ public abstract class AbstractWfNode { ObjectNode configObj = JsonUtil.toBean(node.getNodeConfig(), ObjectNode.class); if (configObj.isEmpty()) { log.error("node config is empty,node uuid:{}", state.getUuid()); - throw new WorkflowBaseException(A_WF_NODE_CONFIG_NOT_FOUND); + throw new BaseException(A_WF_NODE_CONFIG_NOT_FOUND.getInfo()); } log.info("node config:{}", configObj); T nodeConfig = JsonUtil.fromJson(configObj, clazz); if (null == nodeConfig) { log.warn("找不到节点的配置,node uuid:{}", state.getUuid()); - throw new WorkflowBaseException(A_WF_NODE_CONFIG_ERROR); + throw new BaseException(A_WF_NODE_CONFIG_ERROR.getInfo()); } boolean configValid = true; try { @@ -206,7 +205,7 @@ public abstract class AbstractWfNode { } if (!configValid) { log.warn("节点配置错误,node uuid:{}", state.getUuid()); - throw new WorkflowBaseException(A_WF_NODE_CONFIG_ERROR); + throw new BaseException(A_WF_NODE_CONFIG_ERROR.getInfo()); } return nodeConfig; }