diff --git a/pom.xml b/pom.xml index 4441852f..1b60cd93 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 1.13.0-beta23 1.5.3 1.19.6 - 1.0.7 + 1.2.6 1.62.2 diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/enums/ChatModeType.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/enums/ChatModeType.java index 066d6c3a..1371b6a4 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/enums/ChatModeType.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/enums/ChatModeType.java @@ -18,7 +18,8 @@ public enum ChatModeType { PPIO("ppio", "ppio"), CUSTOM_API("custom_api", "自定义API"), MINIMAX("minimax", "MiniMax"), - XIAOMI("xiaomi", "小米MiMo"); + XIAOMI("xiaomi", "小米MiMo"), + DIFY("dify", "Dify平台"); private final String code; private final String description; diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/ChatServiceFacade.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/ChatServiceFacade.java index 108c3cba..da1b8ccf 100644 --- a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/ChatServiceFacade.java +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/ChatServiceFacade.java @@ -47,6 +47,7 @@ import org.ruoyi.common.sse.core.SseEmitterManager; import org.ruoyi.common.sse.utils.SseMessageUtils; import org.ruoyi.domain.bo.vector.QueryVectorBo; import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo; +import org.ruoyi.enums.ChatModeType; import org.ruoyi.factory.ChatServiceFactory; import org.ruoyi.mcp.service.core.ToolProviderFactory; import org.ruoyi.observability.*; @@ -97,6 +98,8 @@ public class ChatServiceFacade implements IChatService { private final ToolProviderFactory toolProviderFactory; + private final org.ruoyi.service.chat.impl.provider.DifyWorkflowService difyWorkflowService; + /** * 内存实例缓存,避免同一会话重复创建 * Key: sessionId, Value: MessageWindowChatMemory实例 @@ -163,6 +166,14 @@ public class ChatServiceFacade implements IChatService { * @return 如果需要提前返回则返回SseEmitter,否则返回null */ private SseEmitter handleSpecialChatModes(ChatRequest chatRequest) { + // 处理 Dify 工作流对话 + if (chatRequest.getEnableWorkFlow() + && chatRequest.getChatModelVo() != null + && ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode())) { + log.info("处理Dify工作流对话,会话: {}", chatRequest.getSessionId()); + return difyWorkflowService.streaming(chatRequest.getChatModelVo(), chatRequest); + } + // 处理工作流对话 if (chatRequest.getEnableWorkFlow()) { log.info("处理工作流对话,会话: {}", chatRequest.getSessionId()); @@ -430,8 +441,12 @@ public class ChatServiceFacade implements IChatService { } } + // Dify 自带 RAG 知识库检索,跳过本地向量库查询 + boolean isDifyProvider = chatRequest.getChatModelVo() != null + && ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode()); + // 从向量库查询相关历史消息(知识库内容作为上下文) - if (chatRequest.getKnowledgeId() != null) { + if (chatRequest.getKnowledgeId() != null && !isDifyProvider) { // 查询知识库信息 KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKnowledgeId())); if (knowledgeInfoVo == null) { diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyChatServiceImpl.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyChatServiceImpl.java new file mode 100644 index 00000000..4166f151 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyChatServiceImpl.java @@ -0,0 +1,43 @@ +package org.ruoyi.service.chat.impl.provider; + +import dev.langchain4j.model.chat.ChatModel; +import dev.langchain4j.model.chat.StreamingChatModel; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.ruoyi.common.chat.domain.dto.request.ChatRequest; +import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo; +import org.ruoyi.enums.ChatModeType; +import org.ruoyi.service.chat.AbstractChatService; +import org.ruoyi.service.chat.impl.provider.model.DifyStreamingChatModel; +import org.springframework.stereotype.Service; + +/** + * Dify 平台对话服务 + *

+ * 通过 dify-java-client 接入 Dify 的对话型应用 (Chat App) 和 + * 工作流编排对话应用 (Chatflow App),支持流式 SSE 响应。 + * + * @author better + */ +@Service +@Slf4j +@RequiredArgsConstructor +public class DifyChatServiceImpl implements AbstractChatService { + + private final DifyConversationService difyConversationService; + + @Override + public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) { + return new DifyStreamingChatModel(chatModelVo, chatRequest, difyConversationService); + } + + @Override + public ChatModel buildChatModel(ChatModelVo chatModelVo) { + throw new UnsupportedOperationException("Dify 不支持同步 ChatModel,请使用流式模式"); + } + + @Override + public String getProviderName() { + return ChatModeType.DIFY.getCode(); + } +} diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyConversationService.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyConversationService.java new file mode 100644 index 00000000..d8c9df29 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyConversationService.java @@ -0,0 +1,35 @@ +package org.ruoyi.service.chat.impl.provider; + +import org.springframework.stereotype.Service; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * Dify 会话映射管理 + *

+ * 维护 ruoyi sessionId 与 Dify conversation_id 的映射关系, + * 确保多轮对话上下文连续。 + * + * @author better + */ +@Service +public class DifyConversationService { + + private final ConcurrentHashMap conversationMap = new ConcurrentHashMap<>(); + + public String getConversationId(Long sessionId) { + return conversationMap.get(sessionId); + } + + public void saveMapping(Long sessionId, String difyConversationId) { + if (sessionId != null && difyConversationId != null) { + conversationMap.put(sessionId, difyConversationId); + } + } + + public void clearMapping(Long sessionId) { + if (sessionId != null) { + conversationMap.remove(sessionId); + } + } +} diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyWorkflowService.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyWorkflowService.java new file mode 100644 index 00000000..51afae73 --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/DifyWorkflowService.java @@ -0,0 +1,137 @@ +package org.ruoyi.service.chat.impl.provider; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.github.imfangs.dify.client.DifyClientFactory; +import io.github.imfangs.dify.client.DifyWorkflowClient; +import io.github.imfangs.dify.client.enums.ResponseMode; +import io.github.imfangs.dify.client.event.ErrorEvent; +import io.github.imfangs.dify.client.event.WorkflowFinishedEvent; +import io.github.imfangs.dify.client.event.WorkflowTextChunkEvent; +import io.github.imfangs.dify.client.callback.WorkflowStreamCallback; +import io.github.imfangs.dify.client.model.workflow.WorkflowRunRequest; +import lombok.extern.slf4j.Slf4j; +import org.ruoyi.common.chat.domain.dto.request.ChatRequest; +import org.ruoyi.common.chat.domain.dto.request.WorkFlowRunner; +import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo; +import org.ruoyi.common.sse.utils.SseMessageUtils; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Dify 工作流执行服务 + *

+ * 通过 DifyWorkflowClient 调用 Dify 平台上部署的工作流应用, + * 并将节点事件通过 SSE 实时推送给前端。 + * + * @author better + */ +@Service +@Slf4j +public class DifyWorkflowService { + + /** + * 流式执行 Dify 工作流 + * + * @param chatModelVo 模型配置(apiHost= Dify 地址, apiKey= Dify 密钥) + * @param chatRequest 聊天请求 + * @return SSE emitter + */ + public SseEmitter streaming(ChatModelVo chatModelVo, ChatRequest chatRequest) { + Long userId = chatRequest.getUserId(); + String tokenValue = chatRequest.getTokenValue(); + SseEmitter emitter = chatRequest.getEmitter(); + + // 构建 Dify 工作流请求参数 + Map inputs = convertInputs(chatRequest.getWorkFlowRunner()); + + WorkflowRunRequest request = WorkflowRunRequest.builder() + .inputs(inputs) + .responseMode(ResponseMode.STREAMING) + .user(String.valueOf(userId)) + .build(); + + DifyWorkflowClient client = DifyClientFactory.createWorkflowClient( + normalizeBaseUrl(chatModelVo.getApiHost()), + chatModelVo.getApiKey()); + + // 异步执行,避免阻塞请求线程 + CompletableFuture.runAsync(() -> { + try { + client.runWorkflowStream(request, new WorkflowStreamCallback() { + + @Override + public void onWorkflowTextChunk(WorkflowTextChunkEvent event) { + String text = event.getData() != null ? event.getData().getText() : null; + if (text != null) { + SseMessageUtils.sendContent(userId, text); + } + } + + @Override + public void onWorkflowFinished(WorkflowFinishedEvent event) { + // 将最终输出作为内容发送 + if (event.getData() != null && event.getData().getOutputs() != null) { + Map outputs = event.getData().getOutputs(); + for (Map.Entry entry : outputs.entrySet()) { + SseMessageUtils.sendContent(userId, + entry.getKey() + ": " + entry.getValue() + "\n"); + } + } + SseMessageUtils.sendDone(userId); + SseMessageUtils.completeConnection(userId, tokenValue); + } + + @Override + public void onError(ErrorEvent event) { + SseMessageUtils.sendError(userId, event.getMessage()); + } + + @Override + public void onException(Throwable throwable) { + log.error("Dify 工作流执行异常", throwable); + SseMessageUtils.sendError(userId, throwable.getMessage()); + SseMessageUtils.completeConnection(userId, tokenValue); + } + }); + } catch (Exception e) { + log.error("Dify 工作流执行失败", e); + SseMessageUtils.sendError(userId, e.getMessage()); + SseMessageUtils.completeConnection(userId, tokenValue); + } + }); + + return emitter; + } + + /** + * 将 WorkFlowRunner.inputs (List) 转换为 Dify 所需的 Map + */ + private Map convertInputs(WorkFlowRunner runner) { + Map result = new HashMap<>(); + if (runner == null || runner.getInputs() == null) { + return result; + } + for (ObjectNode node : runner.getInputs()) { + Iterator> fields = node.fields(); + while (fields.hasNext()) { + Map.Entry field = fields.next(); + result.put(field.getKey(), field.getValue().asText()); + } + } + return result; + } + + private String normalizeBaseUrl(String baseUrl) { + if (baseUrl == null || baseUrl.isBlank()) { + throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空"); + } + return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + } +} diff --git a/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/model/DifyStreamingChatModel.java b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/model/DifyStreamingChatModel.java new file mode 100644 index 00000000..4dbe287d --- /dev/null +++ b/ruoyi-modules/ruoyi-chat/src/main/java/org/ruoyi/service/chat/impl/provider/model/DifyStreamingChatModel.java @@ -0,0 +1,172 @@ +package org.ruoyi.service.chat.impl.provider.model; + +import dev.langchain4j.data.message.AiMessage; +import dev.langchain4j.data.message.ChatMessage; +import dev.langchain4j.data.message.UserMessage; +import dev.langchain4j.model.chat.StreamingChatModel; +import dev.langchain4j.model.chat.response.ChatResponse; +import dev.langchain4j.model.chat.response.StreamingChatResponseHandler; +import io.github.imfangs.dify.client.DifyChatClient; +import io.github.imfangs.dify.client.DifyClientFactory; +import io.github.imfangs.dify.client.enums.ResponseMode; +import io.github.imfangs.dify.client.event.ErrorEvent; +import io.github.imfangs.dify.client.event.MessageEndEvent; +import io.github.imfangs.dify.client.event.MessageEvent; +import lombok.extern.slf4j.Slf4j; +import org.ruoyi.common.chat.domain.dto.request.ChatRequest; +import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo; +import org.ruoyi.service.chat.impl.provider.DifyConversationService; + +import java.util.List; + +/** + * Dify 流式聊天模型适配器 + *

+ * 将 Dify 的回调式流式响应适配为 langchain4j 的 StreamingChatModel 接口, + * 使 ChatServiceFacade 可以像其他 provider 一样统一调用。 + * + * @author better + */ +@Slf4j +public class DifyStreamingChatModel implements StreamingChatModel { + + private final ChatModelVo chatModelVo; + private final ChatRequest chatRequest; + private final DifyConversationService conversationService; + + public DifyStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest, + DifyConversationService conversationService) { + this.chatModelVo = chatModelVo; + this.chatRequest = chatRequest; + this.conversationService = conversationService; + } + + @Override + public void chat(List messages, StreamingChatResponseHandler handler) { + // 1. 从 langchain4j 消息列表中提取最后一条用户消息作为 query + String query = extractUserQuery(messages); + + // 2. 获取 Dify conversation_id(多轮对话连续性) + String conversationId = null; + if (chatRequest.getSessionId() != null) { + conversationId = conversationService.getConversationId(chatRequest.getSessionId()); + } + + // 3. 构建 Dify 请求 + io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder() + .query(query) + .user(String.valueOf(chatRequest.getUserId())) + .responseMode(ResponseMode.STREAMING) + .conversationId(conversationId) + .autoGenerateName(true) + .build(); + + // 4. 创建 Dify 客户端并发送流式请求 + try { + DifyChatClient client = DifyClientFactory.createChatClient( + normalizeBaseUrl(chatModelVo.getApiHost()), + chatModelVo.getApiKey()); + + client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler)); + } catch (Exception e) { + log.error("Dify 流式对话调用失败", e); + handler.onError(e); + } + } + + @Override + public void chat(String userMessage, StreamingChatResponseHandler handler) { + io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder() + .query(userMessage) + .user(String.valueOf(chatRequest.getUserId())) + .responseMode(ResponseMode.STREAMING) + .conversationId(chatRequest.getSessionId() != null + ? conversationService.getConversationId(chatRequest.getSessionId()) : null) + .autoGenerateName(true) + .build(); + + try { + DifyChatClient client = DifyClientFactory.createChatClient( + normalizeBaseUrl(chatModelVo.getApiHost()), + chatModelVo.getApiKey()); + + client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler)); + } catch (Exception e) { + log.error("Dify 流式对话调用失败", e); + handler.onError(e); + } + } + + /** + * 从 langchain4j 消息列表中提取最后一条用户消息文本 + */ + private String extractUserQuery(List messages) { + for (int i = messages.size() - 1; i >= 0; i--) { + ChatMessage msg = messages.get(i); + if (msg instanceof UserMessage) { + return ((UserMessage) msg).singleText(); + } + } + return ""; + } + + private String normalizeBaseUrl(String baseUrl) { + if (baseUrl == null || baseUrl.isBlank()) { + throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空"); + } + return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl; + } + + /** + * Dify 回调适配器 + * 将 Dify ChatStreamCallback 事件转发给 langchain4j StreamingChatResponseHandler + */ + private class DifyChatStreamAdapter implements io.github.imfangs.dify.client.callback.ChatStreamCallback { + + private final StreamingChatResponseHandler handler; + private final StringBuilder fullResponse = new StringBuilder(); + + DifyChatStreamAdapter(StreamingChatResponseHandler handler) { + this.handler = handler; + } + + @Override + public void onMessage(MessageEvent event) { + String answer = event.getAnswer(); + if (answer != null) { + fullResponse.append(answer); + handler.onPartialResponse(answer); + } + // 保存 Dify conversation_id 以维持多轮对话 + if (event.getConversationId() != null && chatRequest.getSessionId() != null) { + conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId()); + } + } + + @Override + public void onMessageEnd(MessageEndEvent event) { + // 保存 conversation_id + if (event.getConversationId() != null && chatRequest.getSessionId() != null) { + conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId()); + } + + // 构建完整的 ChatResponse 交给上层处理 + AiMessage aiMessage = new AiMessage(fullResponse.toString()); + ChatResponse response = ChatResponse.builder() + .aiMessage(aiMessage) + .id(event.getMessageId()) + .build(); + handler.onCompleteResponse(response); + } + + @Override + public void onError(ErrorEvent event) { + handler.onError(new RuntimeException(event.getMessage())); + } + + @Override + public void onException(Throwable throwable) { + handler.onError(throwable); + } + } +}