mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-04-13 20:11:29 +00:00
AI工作流优化
This commit is contained in:
@@ -26,7 +26,7 @@
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-core</artifactId>
|
||||
<artifactId>ruoyi-common-chat</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
@@ -34,13 +34,6 @@
|
||||
<artifactId>ruoyi-common-web</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-mybatis</artifactId>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.ruoyi</groupId>
|
||||
<artifactId>ruoyi-common-satoken</artifactId>
|
||||
|
||||
@@ -4,6 +4,7 @@ import org.ruoyi.workflow.entity.WorkflowComponent;
|
||||
import org.ruoyi.workflow.entity.WorkflowNode;
|
||||
import org.ruoyi.workflow.workflow.node.AbstractWfNode;
|
||||
import org.ruoyi.workflow.workflow.node.EndNode;
|
||||
import org.ruoyi.workflow.workflow.node.HumanFeedbackNode;
|
||||
import org.ruoyi.workflow.workflow.node.answer.LLMAnswerNode;
|
||||
import org.ruoyi.workflow.workflow.node.httpRequest.HttpRequestNode;
|
||||
import org.ruoyi.workflow.workflow.node.keywordExtractor.KeywordExtractorNode;
|
||||
@@ -25,6 +26,7 @@ public class WfNodeFactory {
|
||||
case MAIL_SEND -> wfNode = new MailSendNode(wfComponent, nodeDefinition, wfState, nodeState);
|
||||
case HTTP_REQUEST -> wfNode = new HttpRequestNode(wfComponent, nodeDefinition, wfState, nodeState);
|
||||
case SWITCHER -> wfNode = new SwitcherNode(wfComponent, nodeDefinition, wfState, nodeState);
|
||||
case HUMAN_FEEDBACK -> wfNode = new HumanFeedbackNode(wfComponent, nodeDefinition, wfState, nodeState);
|
||||
default -> {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import org.ruoyi.workflow.entity.User;
|
||||
import org.ruoyi.workflow.entity.WorkflowNode;
|
||||
import org.ruoyi.workflow.workflow.data.NodeIOData;
|
||||
import org.ruoyi.workflow.workflow.node.AbstractWfNode;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -24,6 +25,9 @@ public class WfState {
|
||||
private String uuid;
|
||||
private User user;
|
||||
private String processingNodeUuid;
|
||||
private Long userId;
|
||||
private String tokenValue;
|
||||
private SseEmitter sseEmitter;
|
||||
|
||||
//Source node uuid => target node uuid list
|
||||
private Map<String, List<String>> edges = new HashMap<>();
|
||||
@@ -55,10 +59,13 @@ public class WfState {
|
||||
*/
|
||||
private Set<String> interruptNodes = new HashSet<>();
|
||||
|
||||
public WfState(User user, List<NodeIOData> input, String uuid) {
|
||||
public WfState(User user, List<NodeIOData> input, String uuid, Long userId, String tokenValue, SseEmitter sseEmitter) {
|
||||
this.input = input;
|
||||
this.user = user;
|
||||
this.uuid = uuid;
|
||||
this.userId = userId;
|
||||
this.tokenValue = tokenValue;
|
||||
this.sseEmitter = sseEmitter;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -68,7 +68,7 @@ public class WorkflowEngine {
|
||||
this.workflowRuntimeNodeService = workflowRuntimeNodeService;
|
||||
}
|
||||
|
||||
public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
|
||||
public void run(User user, List<ObjectNode> userInputs, SseEmitter sseEmitter, Long userId, String tokenValue) {
|
||||
this.user = user;
|
||||
this.sseEmitter = sseEmitter;
|
||||
log.info("WorkflowEngine run,userId:{},workflowUuid:{},userInputs:{}", user.getId(), workflow.getUuid(), userInputs);
|
||||
@@ -86,7 +86,7 @@ public class WorkflowEngine {
|
||||
Pair<WorkflowNode, Set<WorkflowNode>> startAndEnds = findStartAndEndNode();
|
||||
WorkflowNode startNode = startAndEnds.getLeft();
|
||||
List<NodeIOData> wfInputs = getAndCheckUserInput(userInputs, startNode);
|
||||
this.wfState = new WfState(user, wfInputs, runtimeUuid);
|
||||
this.wfState = new WfState(user, wfInputs, runtimeUuid,userId, tokenValue, sseEmitter);
|
||||
workflowRuntimeService.updateInput(this.wfRuntimeResp.getId(), wfState);
|
||||
|
||||
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package org.ruoyi.workflow.workflow;
|
||||
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.core.exception.base.BaseException;
|
||||
import org.ruoyi.common.satoken.utils.LoginHelper;
|
||||
import org.ruoyi.common.sse.core.SseEmitterManager;
|
||||
import org.ruoyi.workflow.entity.*;
|
||||
import org.ruoyi.workflow.helper.SSEEmitterHelper;
|
||||
import org.ruoyi.workflow.service.*;
|
||||
@@ -46,9 +49,17 @@ public class WorkflowStarter {
|
||||
@Resource
|
||||
private SSEEmitterHelper sseEmitterHelper;
|
||||
|
||||
@Resource
|
||||
private SseEmitterManager sseEmitterManager;
|
||||
|
||||
|
||||
public SseEmitter streaming(User user, String workflowUuid, List<ObjectNode> userInputs) {
|
||||
SseEmitter sseEmitter = new SseEmitter(SSE_TIMEOUT);
|
||||
// 获取用户ID
|
||||
Long userId = LoginHelper.getUserId();
|
||||
// 获取登录Token
|
||||
String tokenValue = StpUtil.getTokenValue();
|
||||
// 根据用户ID和Token连接SSE对象
|
||||
SseEmitter sseEmitter = sseEmitterManager.connect(userId, tokenValue);
|
||||
if (!sseEmitterHelper.checkOrComplete(user, sseEmitter)) {
|
||||
return sseEmitter;
|
||||
}
|
||||
@@ -60,12 +71,12 @@ public class WorkflowStarter {
|
||||
sseEmitterHelper.sendErrorAndComplete(user.getId(), sseEmitter, A_WF_DISABLED.getInfo());
|
||||
return sseEmitter;
|
||||
}
|
||||
self.asyncRun(user, workflow, userInputs, sseEmitter);
|
||||
self.asyncRun(user, workflow, userInputs, sseEmitter, userId, tokenValue);
|
||||
return sseEmitter;
|
||||
}
|
||||
|
||||
@Async
|
||||
public void asyncRun(User user, Workflow workflow, List<ObjectNode> userInputs, SseEmitter sseEmitter) {
|
||||
public void asyncRun(User user, Workflow workflow, List<ObjectNode> userInputs, SseEmitter sseEmitter, Long userId, String tokenValue) {
|
||||
log.info("WorkflowEngine run,userId:{},workflowUuid:{},userInputs:{}", user.getId(), workflow.getUuid(), userInputs);
|
||||
List<WorkflowComponent> components = workflowComponentService.getAllEnable();
|
||||
List<WorkflowNode> nodes = workflowNodeService.lambdaQuery()
|
||||
@@ -79,7 +90,7 @@ public class WorkflowStarter {
|
||||
WorkflowEngine workflowEngine = new WorkflowEngine(workflow,
|
||||
sseEmitterHelper, components, nodes, edges,
|
||||
workflowRuntimeService, workflowRuntimeNodeService);
|
||||
workflowEngine.run(user, userInputs, sseEmitter);
|
||||
workflowEngine.run(user, userInputs, sseEmitter, userId, tokenValue);
|
||||
}
|
||||
|
||||
@Async
|
||||
|
||||
@@ -3,10 +3,19 @@ package org.ruoyi.workflow.workflow;
|
||||
import cn.hutool.core.collection.CollStreamUtil;
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import dev.langchain4j.data.message.ChatMessage;
|
||||
import dev.langchain4j.data.message.SystemMessage;
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.bsc.langgraph4j.langchain4j.generators.StreamingChatGenerator;
|
||||
import org.bsc.langgraph4j.state.AgentState;
|
||||
import org.ruoyi.common.chat.Service.IChatModelService;
|
||||
import org.ruoyi.common.chat.Service.IChatService;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.chat.factory.ChatServiceFactory;
|
||||
import org.ruoyi.workflow.base.NodeInputConfigTypeHandler;
|
||||
import org.ruoyi.workflow.entity.WorkflowNode;
|
||||
import org.ruoyi.workflow.enums.WfIODataTypeEnum;
|
||||
@@ -15,11 +24,9 @@ import org.ruoyi.workflow.workflow.data.NodeIOData;
|
||||
import org.ruoyi.workflow.workflow.data.NodeIODataContent;
|
||||
import org.ruoyi.workflow.workflow.def.WfNodeParamRef;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import static org.ruoyi.workflow.cosntant.AdiConstant.WorkflowConstant.DEFAULT_OUTPUT_PARAM_NAME;
|
||||
|
||||
@@ -27,6 +34,12 @@ import static org.ruoyi.workflow.cosntant.AdiConstant.WorkflowConstant.DEFAULT_O
|
||||
@Component
|
||||
public class WorkflowUtil {
|
||||
|
||||
@Resource
|
||||
private ChatServiceFactory chatServiceFactory;
|
||||
|
||||
@Resource
|
||||
private IChatModelService chatModelService;
|
||||
|
||||
public static String renderTemplate(String template, List<NodeIOData> values) {
|
||||
// 🔒 关键修复:如果 template 为 null,直接返回 null 或空字符串
|
||||
if (template == null) {
|
||||
@@ -74,8 +87,8 @@ public class WorkflowUtil {
|
||||
|
||||
public static String getHumanFeedbackTip(String nodeUuid, List<WorkflowNode> wfNodes) {
|
||||
WorkflowNode wfNode = wfNodes.stream()
|
||||
.filter(item -> item.getUuid().equals(nodeUuid))
|
||||
.findFirst().orElse(null);
|
||||
.filter(item -> item.getUuid().equals(nodeUuid))
|
||||
.findFirst().orElse(null);
|
||||
if (null == wfNode) {
|
||||
return "";
|
||||
}
|
||||
@@ -88,73 +101,82 @@ public class WorkflowUtil {
|
||||
return String.valueOf(tip);
|
||||
}
|
||||
|
||||
public void streamingInvokeLLM(WfState wfState, WfNodeState state, WorkflowNode node, String category,
|
||||
String modelName, List<UserMessage> systemMessage) {
|
||||
log.info("stream invoke, category: {}, modelName: {}", category, modelName);
|
||||
public void streamingInvokeLLM(WfState wfState, WfNodeState state, WorkflowNode node, String modelName,
|
||||
List<SystemMessage> systemMessage) {
|
||||
log.info("stream invoke, modelName: {}", modelName);
|
||||
|
||||
// 根据模型名称查询模型信息
|
||||
ChatModelVo chatModelVo = chatModelService.selectModelByName(modelName);
|
||||
if (chatModelVo == null) {
|
||||
throw new IllegalArgumentException("模型不存在: " + modelName);
|
||||
}
|
||||
|
||||
// 根据模型名称找到模型实体
|
||||
String modelVoCategory = chatModelVo.getCategory();
|
||||
// 根据 category 获取对应的 ChatService(不使用计费代理,工作流场景单独计费)
|
||||
//IChatService chatService = chatServiceFactory.getOriginalService(category);
|
||||
IChatService chatService = chatServiceFactory.getOriginalService(modelVoCategory);
|
||||
|
||||
StreamingChatGenerator<AgentState> streamingGenerator = StreamingChatGenerator.builder()
|
||||
.mapResult(response -> {
|
||||
String responseTxt = response.aiMessage().text();
|
||||
log.info("llm response:{}", responseTxt);
|
||||
.mapResult(response -> {
|
||||
String responseTxt = response.aiMessage().text();
|
||||
log.info("llm response:{}", responseTxt);
|
||||
|
||||
// 传递所有输入数据 + 添加 LLM 输出
|
||||
wfState.getNodeStateByNodeUuid(node.getUuid()).ifPresent(item -> {
|
||||
List<NodeIOData> outputs = new ArrayList<>(item.getInputs());
|
||||
NodeIOData output = NodeIOData.createByText(DEFAULT_OUTPUT_PARAM_NAME, "", responseTxt);
|
||||
outputs.add(output);
|
||||
item.setOutputs(outputs);
|
||||
});
|
||||
// 传递所有输入数据 + 添加 LLM 输出
|
||||
wfState.getNodeStateByNodeUuid(node.getUuid()).ifPresent(item -> {
|
||||
List<NodeIOData> outputs = new ArrayList<>(item.getInputs());
|
||||
NodeIOData output = NodeIOData.createByText(DEFAULT_OUTPUT_PARAM_NAME, "", responseTxt);
|
||||
outputs.add(output);
|
||||
item.setOutputs(outputs);
|
||||
});
|
||||
|
||||
return Map.of("completeResult", response.aiMessage().text());
|
||||
})
|
||||
.startingNode(node.getUuid())
|
||||
.startingState(state)
|
||||
.build();
|
||||
return Map.of("completeResult", response.aiMessage().text());
|
||||
})
|
||||
.startingNode(node.getUuid())
|
||||
.startingState(state)
|
||||
.build();
|
||||
|
||||
Long userId = wfState.getUserId();
|
||||
String tokenValue = wfState.getTokenValue();
|
||||
SseEmitter sseEmitter = wfState.getSseEmitter();
|
||||
|
||||
// 构建 ruoyi-ai 的 ChatRequest
|
||||
// List<Message> messages = new ArrayList<>();
|
||||
//
|
||||
// addUserMessage(node, state.getInputs(), messages);
|
||||
//
|
||||
// addSystemMessage(systemMessage, messages);
|
||||
//
|
||||
// ChatRequest chatRequest = new ChatRequest();
|
||||
// chatRequest.setModel(modelName);
|
||||
// chatRequest.setMessages(messages);
|
||||
List<ChatMessage> chatMessages = new ArrayList<>();
|
||||
addUserMessage(node, state.getInputs(), chatMessages);
|
||||
chatMessages.addAll(systemMessage);
|
||||
|
||||
// 定义模型调用对象
|
||||
ChatRequest chatRequest = new ChatRequest();
|
||||
// 目前工作流深度思考成员变量只能写死
|
||||
chatRequest.setEnableThinking(false);
|
||||
chatRequest.setModel(modelName);
|
||||
chatRequest.setChatMessages(chatMessages);
|
||||
|
||||
// 使用工作流专用方法
|
||||
StreamingChatResponseHandler handler = streamingGenerator.handler();
|
||||
chatService.chat(chatModelVo, chatRequest, sseEmitter, userId, tokenValue, handler);
|
||||
wfState.getNodeToStreamingGenerator().put(node.getUuid(), streamingGenerator);
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加用户信息
|
||||
*
|
||||
* @param node
|
||||
* @param messages
|
||||
* @param node 节点
|
||||
* @param userMessage 用户信息
|
||||
*/
|
||||
private void addUserMessage(WorkflowNode node, List<NodeIOData> userMessage, List<UserMessage> messages) {
|
||||
private void addUserMessage(WorkflowNode node, List<NodeIOData> userMessage, List<ChatMessage> messages) {
|
||||
if (CollUtil.isEmpty(userMessage)) {
|
||||
return;
|
||||
}
|
||||
|
||||
WfNodeInputConfig nodeInputConfig = NodeInputConfigTypeHandler.fillNodeInputConfig(node.getInputConfig());
|
||||
|
||||
List<WfNodeParamRef> refInputs = nodeInputConfig.getRefInputs();
|
||||
|
||||
Set<String> nameSet = CollStreamUtil.toSet(refInputs, WfNodeParamRef::getName);
|
||||
|
||||
userMessage.stream().filter(item -> nameSet.contains(item.getName()))
|
||||
.map(item -> getMessage("user", item.getContent().getValue().toString())).forEach(messages::add);
|
||||
|
||||
if (CollUtil.isNotEmpty(messages)) {
|
||||
return;
|
||||
// 构建消息列表
|
||||
List<UserMessage> messageList = buildMessageList(userMessage, nameSet);
|
||||
// 如果没有找到匹配的消息,尝试使用input字段
|
||||
if (CollUtil.isEmpty(messageList)) {
|
||||
messageList = buildMessageList(userMessage, Set.of("input"));
|
||||
}
|
||||
|
||||
userMessage.stream().filter(item -> "input".equals(item.getName()))
|
||||
.map(item -> getMessage("user", item.getContent().getValue().toString())).forEach(messages::add);
|
||||
messages.addAll(messageList);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -170,19 +192,13 @@ public class WorkflowUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* 添加系统信息
|
||||
*
|
||||
* @param systemMessage
|
||||
* @param messages
|
||||
* 构建消息列表
|
||||
*/
|
||||
private void addSystemMessage(List<UserMessage> systemMessage, List<UserMessage> messages) {
|
||||
log.info("addSystemMessage received: {}", systemMessage); // 🔥 加这一行
|
||||
|
||||
if (CollUtil.isEmpty(systemMessage)) {
|
||||
return;
|
||||
}
|
||||
systemMessage.stream()
|
||||
.map(userMsg -> getMessage("system", userMsg.singleText()))
|
||||
.forEach(messages::add);
|
||||
private List<UserMessage> buildMessageList(List<NodeIOData> userMessage, Set<String> nameSet) {
|
||||
return userMessage.stream()
|
||||
.filter(item -> item != null && item.getName() != null)
|
||||
// 兼容默认输出参数的人机交互
|
||||
.filter(item -> nameSet.contains(item.getName()))
|
||||
.map(item -> getMessage("user", item.getContent().getValue().toString())).toList();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,14 +124,6 @@ public abstract class AbstractWfNode {
|
||||
log.info("↓↓↓↓↓ node process start,name:{},uuid:{}", node.getTitle(), node.getUuid());
|
||||
state.setProcessStatus(NODE_PROCESS_STATUS_DOING);
|
||||
initInput();
|
||||
//HumanFeedback的情况
|
||||
Object humanFeedbackState = state.data().get(HUMAN_FEEDBACK_KEY);
|
||||
if (null != humanFeedbackState) {
|
||||
String userInput = humanFeedbackState.toString();
|
||||
if (StringUtils.isNotBlank(userInput)) {
|
||||
state.getInputs().add(NodeIOData.createByText(HUMAN_FEEDBACK_KEY, "default", userInput));
|
||||
}
|
||||
}
|
||||
if (null != inputConsumer) {
|
||||
inputConsumer.accept(state);
|
||||
}
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
package org.ruoyi.workflow.workflow.node;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.ruoyi.workflow.entity.WorkflowComponent;
|
||||
import org.ruoyi.workflow.entity.WorkflowNode;
|
||||
import org.ruoyi.workflow.workflow.NodeProcessResult;
|
||||
import org.ruoyi.workflow.workflow.WfNodeState;
|
||||
import org.ruoyi.workflow.workflow.WfState;
|
||||
import org.ruoyi.workflow.workflow.data.NodeIOData;
|
||||
|
||||
import static org.ruoyi.workflow.cosntant.AdiConstant.WorkflowConstant.*;
|
||||
|
||||
/**
|
||||
* 人机交互节点实现类
|
||||
*/
|
||||
@Slf4j
|
||||
public class HumanFeedbackNode extends AbstractWfNode {
|
||||
|
||||
public HumanFeedbackNode(WorkflowComponent component, WorkflowNode nodeDefinition, WfState wfState, WfNodeState nodeState) {
|
||||
super(component, nodeDefinition, wfState, nodeState);
|
||||
}
|
||||
|
||||
// 人机交互节点的处理逻辑
|
||||
@Override
|
||||
public NodeProcessResult onProcess() {
|
||||
log.info("Processing HumanFeedback node: {}", node.getTitle());
|
||||
// 从状态中获取用户输入数据
|
||||
Object humanFeedbackState = state.data().get(HUMAN_FEEDBACK_KEY);
|
||||
if (null != humanFeedbackState) {
|
||||
String userInput = humanFeedbackState.toString();
|
||||
if (StringUtils.isNotBlank(userInput)) {
|
||||
// 用户已提供输入,将用户输入添加到节点输入和输出中
|
||||
NodeIOData feedbackData = NodeIOData.createByText("output", "default", userInput);
|
||||
// 添加到输出列表,这样后续节点可以使用
|
||||
state.getOutputs().add(feedbackData);
|
||||
// 设置为成功状态
|
||||
state.setProcessStatus(NODE_PROCESS_STATUS_SUCCESS);
|
||||
log.info("Human feedback processed for node: {}, content: {}", node.getTitle(), userInput);
|
||||
} else {
|
||||
// 用户输入为空,设置等待状态
|
||||
state.setProcessStatus(NODE_PROCESS_STATUS_DOING);
|
||||
log.info("Human feedback is empty for node: {}", node.getTitle());
|
||||
}
|
||||
} else {
|
||||
// 没有用户输入,这可能是正常情况(等待用户输入)
|
||||
// 但为了确保流程可以继续,我们仍然标记为成功
|
||||
state.setProcessStatus(NODE_PROCESS_STATUS_SUCCESS);
|
||||
log.info("No human feedback found for node: {}, continuing workflow", node.getTitle());
|
||||
}
|
||||
return new NodeProcessResult();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
package org.ruoyi.workflow.workflow.node.answer;
|
||||
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import dev.langchain4j.data.message.SystemMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.ruoyi.workflow.entity.WorkflowComponent;
|
||||
@@ -44,9 +44,9 @@ public class LLMAnswerNode extends AbstractWfNode {
|
||||
// 调用LLM
|
||||
WorkflowUtil workflowUtil = SpringUtil.getBean(WorkflowUtil.class);
|
||||
String modelName = nodeConfigObj.getModelName();
|
||||
String category = nodeConfigObj.getCategory();
|
||||
List<UserMessage> systemMessage = List.of(UserMessage.from(prompt));
|
||||
workflowUtil.streamingInvokeLLM(wfState, state, node, category, modelName, systemMessage);
|
||||
// 转换系统信息结构
|
||||
List<SystemMessage> systemMessage = List.of(new SystemMessage(prompt));
|
||||
workflowUtil.streamingInvokeLLM(wfState, state, node, modelName, systemMessage);
|
||||
return new NodeProcessResult();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package org.ruoyi.workflow.workflow.node.keywordExtractor;
|
||||
|
||||
import dev.langchain4j.data.message.SystemMessage;
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -65,12 +66,9 @@ public class KeywordExtractorNode extends AbstractWfNode {
|
||||
// 调用 LLM 进行关键词提取
|
||||
WorkflowUtil workflowUtil = SpringUtil.getBean(WorkflowUtil.class);
|
||||
String modelName = config.getModelName();
|
||||
String category = config.getCategory();
|
||||
List<UserMessage> systemMessage = List.of(UserMessage.from(prompt));
|
||||
|
||||
List<SystemMessage> systemMessage = List.of(new SystemMessage(prompt));
|
||||
// 使用流式调用
|
||||
workflowUtil.streamingInvokeLLM(wfState, state, node, category, modelName, systemMessage);
|
||||
|
||||
workflowUtil.streamingInvokeLLM(wfState, state, node, modelName, systemMessage);
|
||||
return new NodeProcessResult();
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package org.ruoyi.workflow.workflow.node.knowledgeRetrieval;
|
||||
|
||||
import dev.langchain4j.data.message.SystemMessage;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.ruoyi.workflow.entity.WorkflowComponent;
|
||||
@@ -150,18 +151,15 @@ public class KnowledgeRetrievalNode extends AbstractWfNode {
|
||||
|
||||
// 使用WorkflowUtil调用LLM(流式)
|
||||
WorkflowUtil workflowUtil = SpringUtil.getBean(WorkflowUtil.class);
|
||||
List<dev.langchain4j.data.message.UserMessage> systemMessage =
|
||||
List.of(dev.langchain4j.data.message.UserMessage.from(prompt));
|
||||
List<SystemMessage> systemMessage = List.of(new SystemMessage(prompt));
|
||||
|
||||
// 调用流式LLM
|
||||
String category = StringUtils.isNotBlank(config.getCategory()) ? config.getCategory() : "llm";
|
||||
String modelName = StringUtils.isNotBlank(config.getModelName()) ? config.getModelName() : "deepseek-chat";
|
||||
|
||||
workflowUtil.streamingInvokeLLM(
|
||||
wfState,
|
||||
tempState,
|
||||
tempNode,
|
||||
category,
|
||||
modelName,
|
||||
systemMessage
|
||||
);
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
package org.ruoyi.workflow.workflow.node.switcher;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.ruoyi.common.core.utils.SpringUtils;
|
||||
import org.ruoyi.workflow.entity.WorkflowComponent;
|
||||
import org.ruoyi.workflow.entity.WorkflowNode;
|
||||
import org.ruoyi.workflow.service.WorkflowNodeService;
|
||||
import org.ruoyi.workflow.workflow.NodeProcessResult;
|
||||
import org.ruoyi.workflow.workflow.WfNodeState;
|
||||
import org.ruoyi.workflow.workflow.WfState;
|
||||
@@ -12,6 +18,8 @@ import org.ruoyi.workflow.workflow.node.AbstractWfNode;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* 条件分支节点
|
||||
@@ -24,6 +32,8 @@ public class SwitcherNode extends AbstractWfNode {
|
||||
super(wfComponent, nodeDef, wfState, nodeState);
|
||||
}
|
||||
|
||||
private static final WorkflowNodeService workflowNodeService = SpringUtils.getBean(WorkflowNodeService.class);
|
||||
|
||||
@Override
|
||||
public NodeProcessResult onProcess() {
|
||||
try {
|
||||
@@ -233,7 +243,7 @@ public class SwitcherNode extends AbstractWfNode {
|
||||
*/
|
||||
private String getValueFromInputs(String nodeUuid, String paramName, List<NodeIOData> inputs) {
|
||||
log.debug("从节点UUID '{}' 搜索参数 '{}'", nodeUuid, paramName);
|
||||
|
||||
|
||||
String result = null;
|
||||
|
||||
// 首先尝试从当前输入中查找
|
||||
@@ -244,7 +254,7 @@ public class SwitcherNode extends AbstractWfNode {
|
||||
result = input.valueToString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (result != null) {
|
||||
log.info("在当前输入中找到参数 '{}': '{}'", paramName, result);
|
||||
return result;
|
||||
@@ -260,7 +270,10 @@ public class SwitcherNode extends AbstractWfNode {
|
||||
result = output.valueToString();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 根据UUID查询对应节点是否存在Param(替换成Input)
|
||||
result = findParamValueInNode(nodeUuid, paramName, inputs, result);
|
||||
|
||||
if (result != null) {
|
||||
log.info("在节点 '{}' 的输出中找到参数 '{}': '{}'", nodeUuid, paramName, result);
|
||||
return result;
|
||||
@@ -282,6 +295,47 @@ public class SwitcherNode extends AbstractWfNode {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据节点UUID和参数名查找对应的输入值
|
||||
* 意义:修复开始节点参数名错误的问题
|
||||
*
|
||||
* @param nodeUuid 节点的唯一标识符(UUID)
|
||||
* @param paramName 需要查找的参数名称
|
||||
* @param inputs 输入数据列表,用于匹配参数值
|
||||
* @param result 默认返回结果,若未找到匹配项则返回该值
|
||||
* @return 返回查找到的参数值,若未找到则返回默认结果
|
||||
*/
|
||||
private String findParamValueInNode(String nodeUuid, String paramName, List<NodeIOData> inputs, String result) {
|
||||
// 查询工作流节点信息
|
||||
WorkflowNode workflowNode = workflowNodeService.lambdaQuery().eq(WorkflowNode::getUuid, nodeUuid).one();
|
||||
if (ObjectUtils.isNotEmpty(workflowNode)){
|
||||
// 获取节点的输入配置
|
||||
String inputConfig = workflowNode.getInputConfig();
|
||||
log.info("节点 '{}' 的输入配置: {}", nodeUuid, inputConfig);
|
||||
if (StringUtils.isNotBlank(inputConfig)){
|
||||
// 解析输入配置为JSON对象
|
||||
JSONObject configJson = JSON.parseObject(inputConfig);
|
||||
// 获取 user_inputs 数组
|
||||
JSONArray userInputs = configJson.getJSONArray("user_inputs");
|
||||
if (userInputs != null && !userInputs.isEmpty()) {
|
||||
// 在 user_inputs 中查找匹配的参数名,并获取对应值
|
||||
Optional<String> valueOpt = userInputs.stream()
|
||||
.filter(JSONObject.class::isInstance)
|
||||
.map(JSONObject.class::cast)
|
||||
.filter(obj -> paramName.equals(obj.getString("name")))
|
||||
.map(matchedObj -> getValueFromInputs(nodeUuid, "input", inputs))
|
||||
.filter(Objects::nonNull)
|
||||
.findFirst();
|
||||
// 若找到匹配值,则更新结果
|
||||
if (valueOpt.isPresent()) {
|
||||
result = valueOpt.get();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据运算符评估条件
|
||||
*/
|
||||
|
||||
Reference in New Issue
Block a user