mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-04-24 09:13:39 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc151e49c5 |
2
pom.xml
2
pom.xml
@@ -58,7 +58,7 @@
|
||||
<langchain4j.community.version>1.13.0-beta23</langchain4j.community.version>
|
||||
<langgraph4j.version>1.5.3</langgraph4j.version>
|
||||
<weaviate.version>1.19.6</weaviate.version>
|
||||
<dify.version>1.0.7</dify.version>
|
||||
<dify.version>1.2.6</dify.version>
|
||||
<!-- gRPC 版本 - 解决 Milvus SDK 依赖冲突 -->
|
||||
<grpc.version>1.62.2</grpc.version>
|
||||
<!-- Apache Commons Compress - 用于POI处理ZIP格式 -->
|
||||
|
||||
@@ -18,7 +18,8 @@ public enum ChatModeType {
|
||||
PPIO("ppio", "ppio"),
|
||||
CUSTOM_API("custom_api", "自定义API"),
|
||||
MINIMAX("minimax", "MiniMax"),
|
||||
XIAOMI("xiaomi", "小米MiMo");
|
||||
XIAOMI("xiaomi", "小米MiMo"),
|
||||
DIFY("dify", "Dify平台");
|
||||
private final String code;
|
||||
private final String description;
|
||||
|
||||
|
||||
@@ -47,6 +47,7 @@ import org.ruoyi.common.sse.core.SseEmitterManager;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
import org.ruoyi.domain.bo.vector.QueryVectorBo;
|
||||
import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo;
|
||||
import org.ruoyi.enums.ChatModeType;
|
||||
import org.ruoyi.factory.ChatServiceFactory;
|
||||
import org.ruoyi.mcp.service.core.ToolProviderFactory;
|
||||
import org.ruoyi.observability.*;
|
||||
@@ -97,6 +98,8 @@ public class ChatServiceFacade implements IChatService {
|
||||
|
||||
private final ToolProviderFactory toolProviderFactory;
|
||||
|
||||
private final org.ruoyi.service.chat.impl.provider.DifyWorkflowService difyWorkflowService;
|
||||
|
||||
/**
|
||||
* 内存实例缓存,避免同一会话重复创建
|
||||
* Key: sessionId, Value: MessageWindowChatMemory实例
|
||||
@@ -163,6 +166,14 @@ public class ChatServiceFacade implements IChatService {
|
||||
* @return 如果需要提前返回则返回SseEmitter,否则返回null
|
||||
*/
|
||||
private SseEmitter handleSpecialChatModes(ChatRequest chatRequest) {
|
||||
// 处理 Dify 工作流对话
|
||||
if (chatRequest.getEnableWorkFlow()
|
||||
&& chatRequest.getChatModelVo() != null
|
||||
&& ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode())) {
|
||||
log.info("处理Dify工作流对话,会话: {}", chatRequest.getSessionId());
|
||||
return difyWorkflowService.streaming(chatRequest.getChatModelVo(), chatRequest);
|
||||
}
|
||||
|
||||
// 处理工作流对话
|
||||
if (chatRequest.getEnableWorkFlow()) {
|
||||
log.info("处理工作流对话,会话: {}", chatRequest.getSessionId());
|
||||
@@ -430,8 +441,12 @@ public class ChatServiceFacade implements IChatService {
|
||||
}
|
||||
}
|
||||
|
||||
// Dify 自带 RAG 知识库检索,跳过本地向量库查询
|
||||
boolean isDifyProvider = chatRequest.getChatModelVo() != null
|
||||
&& ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode());
|
||||
|
||||
// 从向量库查询相关历史消息(知识库内容作为上下文)
|
||||
if (chatRequest.getKnowledgeId() != null) {
|
||||
if (chatRequest.getKnowledgeId() != null && !isDifyProvider) {
|
||||
// 查询知识库信息
|
||||
KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKnowledgeId()));
|
||||
if (knowledgeInfoVo == null) {
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
package org.ruoyi.service.chat.impl.provider;
|
||||
|
||||
import dev.langchain4j.model.chat.ChatModel;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.enums.ChatModeType;
|
||||
import org.ruoyi.service.chat.AbstractChatService;
|
||||
import org.ruoyi.service.chat.impl.provider.model.DifyStreamingChatModel;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Dify 平台对话服务
|
||||
* <p>
|
||||
* 通过 dify-java-client 接入 Dify 的对话型应用 (Chat App) 和
|
||||
* 工作流编排对话应用 (Chatflow App),支持流式 SSE 响应。
|
||||
*
|
||||
* @author better
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
public class DifyChatServiceImpl implements AbstractChatService {
|
||||
|
||||
private final DifyConversationService difyConversationService;
|
||||
|
||||
@Override
|
||||
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
return new DifyStreamingChatModel(chatModelVo, chatRequest, difyConversationService);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChatModel buildChatModel(ChatModelVo chatModelVo) {
|
||||
throw new UnsupportedOperationException("Dify 不支持同步 ChatModel,请使用流式模式");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProviderName() {
|
||||
return ChatModeType.DIFY.getCode();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package org.ruoyi.service.chat.impl.provider;
|
||||
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* Dify 会话映射管理
|
||||
* <p>
|
||||
* 维护 ruoyi sessionId 与 Dify conversation_id 的映射关系,
|
||||
* 确保多轮对话上下文连续。
|
||||
*
|
||||
* @author better
|
||||
*/
|
||||
@Service
|
||||
public class DifyConversationService {
|
||||
|
||||
private final ConcurrentHashMap<Long, String> conversationMap = new ConcurrentHashMap<>();
|
||||
|
||||
public String getConversationId(Long sessionId) {
|
||||
return conversationMap.get(sessionId);
|
||||
}
|
||||
|
||||
public void saveMapping(Long sessionId, String difyConversationId) {
|
||||
if (sessionId != null && difyConversationId != null) {
|
||||
conversationMap.put(sessionId, difyConversationId);
|
||||
}
|
||||
}
|
||||
|
||||
public void clearMapping(Long sessionId) {
|
||||
if (sessionId != null) {
|
||||
conversationMap.remove(sessionId);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
package org.ruoyi.service.chat.impl.provider;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import io.github.imfangs.dify.client.DifyClientFactory;
|
||||
import io.github.imfangs.dify.client.DifyWorkflowClient;
|
||||
import io.github.imfangs.dify.client.enums.ResponseMode;
|
||||
import io.github.imfangs.dify.client.event.ErrorEvent;
|
||||
import io.github.imfangs.dify.client.event.WorkflowFinishedEvent;
|
||||
import io.github.imfangs.dify.client.event.WorkflowTextChunkEvent;
|
||||
import io.github.imfangs.dify.client.callback.WorkflowStreamCallback;
|
||||
import io.github.imfangs.dify.client.model.workflow.WorkflowRunRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.dto.request.WorkFlowRunner;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* Dify 工作流执行服务
|
||||
* <p>
|
||||
* 通过 DifyWorkflowClient 调用 Dify 平台上部署的工作流应用,
|
||||
* 并将节点事件通过 SSE 实时推送给前端。
|
||||
*
|
||||
* @author better
|
||||
*/
|
||||
@Service
|
||||
@Slf4j
|
||||
public class DifyWorkflowService {
|
||||
|
||||
/**
|
||||
* 流式执行 Dify 工作流
|
||||
*
|
||||
* @param chatModelVo 模型配置(apiHost= Dify 地址, apiKey= Dify 密钥)
|
||||
* @param chatRequest 聊天请求
|
||||
* @return SSE emitter
|
||||
*/
|
||||
public SseEmitter streaming(ChatModelVo chatModelVo, ChatRequest chatRequest) {
|
||||
Long userId = chatRequest.getUserId();
|
||||
String tokenValue = chatRequest.getTokenValue();
|
||||
SseEmitter emitter = chatRequest.getEmitter();
|
||||
|
||||
// 构建 Dify 工作流请求参数
|
||||
Map<String, Object> inputs = convertInputs(chatRequest.getWorkFlowRunner());
|
||||
|
||||
WorkflowRunRequest request = WorkflowRunRequest.builder()
|
||||
.inputs(inputs)
|
||||
.responseMode(ResponseMode.STREAMING)
|
||||
.user(String.valueOf(userId))
|
||||
.build();
|
||||
|
||||
DifyWorkflowClient client = DifyClientFactory.createWorkflowClient(
|
||||
normalizeBaseUrl(chatModelVo.getApiHost()),
|
||||
chatModelVo.getApiKey());
|
||||
|
||||
// 异步执行,避免阻塞请求线程
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
client.runWorkflowStream(request, new WorkflowStreamCallback() {
|
||||
|
||||
@Override
|
||||
public void onWorkflowTextChunk(WorkflowTextChunkEvent event) {
|
||||
String text = event.getData() != null ? event.getData().getText() : null;
|
||||
if (text != null) {
|
||||
SseMessageUtils.sendContent(userId, text);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onWorkflowFinished(WorkflowFinishedEvent event) {
|
||||
// 将最终输出作为内容发送
|
||||
if (event.getData() != null && event.getData().getOutputs() != null) {
|
||||
Map<String, Object> outputs = event.getData().getOutputs();
|
||||
for (Map.Entry<String, Object> entry : outputs.entrySet()) {
|
||||
SseMessageUtils.sendContent(userId,
|
||||
entry.getKey() + ": " + entry.getValue() + "\n");
|
||||
}
|
||||
}
|
||||
SseMessageUtils.sendDone(userId);
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(ErrorEvent event) {
|
||||
SseMessageUtils.sendError(userId, event.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
log.error("Dify 工作流执行异常", throwable);
|
||||
SseMessageUtils.sendError(userId, throwable.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("Dify 工作流执行失败", e);
|
||||
SseMessageUtils.sendError(userId, e.getMessage());
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
});
|
||||
|
||||
return emitter;
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 WorkFlowRunner.inputs (List<ObjectNode>) 转换为 Dify 所需的 Map
|
||||
*/
|
||||
private Map<String, Object> convertInputs(WorkFlowRunner runner) {
|
||||
Map<String, Object> result = new HashMap<>();
|
||||
if (runner == null || runner.getInputs() == null) {
|
||||
return result;
|
||||
}
|
||||
for (ObjectNode node : runner.getInputs()) {
|
||||
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
|
||||
while (fields.hasNext()) {
|
||||
Map.Entry<String, JsonNode> field = fields.next();
|
||||
result.put(field.getKey(), field.getValue().asText());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private String normalizeBaseUrl(String baseUrl) {
|
||||
if (baseUrl == null || baseUrl.isBlank()) {
|
||||
throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空");
|
||||
}
|
||||
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,172 @@
|
||||
package org.ruoyi.service.chat.impl.provider.model;
|
||||
|
||||
import dev.langchain4j.data.message.AiMessage;
|
||||
import dev.langchain4j.data.message.ChatMessage;
|
||||
import dev.langchain4j.data.message.UserMessage;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.response.ChatResponse;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import io.github.imfangs.dify.client.DifyChatClient;
|
||||
import io.github.imfangs.dify.client.DifyClientFactory;
|
||||
import io.github.imfangs.dify.client.enums.ResponseMode;
|
||||
import io.github.imfangs.dify.client.event.ErrorEvent;
|
||||
import io.github.imfangs.dify.client.event.MessageEndEvent;
|
||||
import io.github.imfangs.dify.client.event.MessageEvent;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
|
||||
import org.ruoyi.service.chat.impl.provider.DifyConversationService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Dify 流式聊天模型适配器
|
||||
* <p>
|
||||
* 将 Dify 的回调式流式响应适配为 langchain4j 的 StreamingChatModel 接口,
|
||||
* 使 ChatServiceFacade 可以像其他 provider 一样统一调用。
|
||||
*
|
||||
* @author better
|
||||
*/
|
||||
@Slf4j
|
||||
public class DifyStreamingChatModel implements StreamingChatModel {
|
||||
|
||||
private final ChatModelVo chatModelVo;
|
||||
private final ChatRequest chatRequest;
|
||||
private final DifyConversationService conversationService;
|
||||
|
||||
public DifyStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest,
|
||||
DifyConversationService conversationService) {
|
||||
this.chatModelVo = chatModelVo;
|
||||
this.chatRequest = chatRequest;
|
||||
this.conversationService = conversationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void chat(List<ChatMessage> messages, StreamingChatResponseHandler handler) {
|
||||
// 1. 从 langchain4j 消息列表中提取最后一条用户消息作为 query
|
||||
String query = extractUserQuery(messages);
|
||||
|
||||
// 2. 获取 Dify conversation_id(多轮对话连续性)
|
||||
String conversationId = null;
|
||||
if (chatRequest.getSessionId() != null) {
|
||||
conversationId = conversationService.getConversationId(chatRequest.getSessionId());
|
||||
}
|
||||
|
||||
// 3. 构建 Dify 请求
|
||||
io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder()
|
||||
.query(query)
|
||||
.user(String.valueOf(chatRequest.getUserId()))
|
||||
.responseMode(ResponseMode.STREAMING)
|
||||
.conversationId(conversationId)
|
||||
.autoGenerateName(true)
|
||||
.build();
|
||||
|
||||
// 4. 创建 Dify 客户端并发送流式请求
|
||||
try {
|
||||
DifyChatClient client = DifyClientFactory.createChatClient(
|
||||
normalizeBaseUrl(chatModelVo.getApiHost()),
|
||||
chatModelVo.getApiKey());
|
||||
|
||||
client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler));
|
||||
} catch (Exception e) {
|
||||
log.error("Dify 流式对话调用失败", e);
|
||||
handler.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void chat(String userMessage, StreamingChatResponseHandler handler) {
|
||||
io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder()
|
||||
.query(userMessage)
|
||||
.user(String.valueOf(chatRequest.getUserId()))
|
||||
.responseMode(ResponseMode.STREAMING)
|
||||
.conversationId(chatRequest.getSessionId() != null
|
||||
? conversationService.getConversationId(chatRequest.getSessionId()) : null)
|
||||
.autoGenerateName(true)
|
||||
.build();
|
||||
|
||||
try {
|
||||
DifyChatClient client = DifyClientFactory.createChatClient(
|
||||
normalizeBaseUrl(chatModelVo.getApiHost()),
|
||||
chatModelVo.getApiKey());
|
||||
|
||||
client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler));
|
||||
} catch (Exception e) {
|
||||
log.error("Dify 流式对话调用失败", e);
|
||||
handler.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 从 langchain4j 消息列表中提取最后一条用户消息文本
|
||||
*/
|
||||
private String extractUserQuery(List<ChatMessage> messages) {
|
||||
for (int i = messages.size() - 1; i >= 0; i--) {
|
||||
ChatMessage msg = messages.get(i);
|
||||
if (msg instanceof UserMessage) {
|
||||
return ((UserMessage) msg).singleText();
|
||||
}
|
||||
}
|
||||
return "";
|
||||
}
|
||||
|
||||
private String normalizeBaseUrl(String baseUrl) {
|
||||
if (baseUrl == null || baseUrl.isBlank()) {
|
||||
throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空");
|
||||
}
|
||||
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Dify 回调适配器
|
||||
* 将 Dify ChatStreamCallback 事件转发给 langchain4j StreamingChatResponseHandler
|
||||
*/
|
||||
private class DifyChatStreamAdapter implements io.github.imfangs.dify.client.callback.ChatStreamCallback {
|
||||
|
||||
private final StreamingChatResponseHandler handler;
|
||||
private final StringBuilder fullResponse = new StringBuilder();
|
||||
|
||||
DifyChatStreamAdapter(StreamingChatResponseHandler handler) {
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageEvent event) {
|
||||
String answer = event.getAnswer();
|
||||
if (answer != null) {
|
||||
fullResponse.append(answer);
|
||||
handler.onPartialResponse(answer);
|
||||
}
|
||||
// 保存 Dify conversation_id 以维持多轮对话
|
||||
if (event.getConversationId() != null && chatRequest.getSessionId() != null) {
|
||||
conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessageEnd(MessageEndEvent event) {
|
||||
// 保存 conversation_id
|
||||
if (event.getConversationId() != null && chatRequest.getSessionId() != null) {
|
||||
conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId());
|
||||
}
|
||||
|
||||
// 构建完整的 ChatResponse 交给上层处理
|
||||
AiMessage aiMessage = new AiMessage(fullResponse.toString());
|
||||
ChatResponse response = ChatResponse.builder()
|
||||
.aiMessage(aiMessage)
|
||||
.id(event.getMessageId())
|
||||
.build();
|
||||
handler.onCompleteResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(ErrorEvent event) {
|
||||
handler.onError(new RuntimeException(event.getMessage()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onException(Throwable throwable) {
|
||||
handler.onError(throwable);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user