1 Commits
main ... dev

Author SHA1 Message Date
wangle
bc151e49c5 feat: 添加Dify平台集成支持
- 升级 dify-sdk-java 从 1.0.7 到 1.2.6
- 新增 ChatModeType.DIFY 枚举类型
- 新增 DifyChatServiceImpl、DifyConversationService、DifyWorkflowService 实现
- 新增 DifyStreamingChatModel 流式聊天模型
- 支持Dify工作流对话模式
- Dify自带RAG知识库时跳过本地向量库查询

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-23 19:19:26 +08:00
7 changed files with 406 additions and 3 deletions

View File

@@ -58,7 +58,7 @@
<langchain4j.community.version>1.13.0-beta23</langchain4j.community.version>
<langgraph4j.version>1.5.3</langgraph4j.version>
<weaviate.version>1.19.6</weaviate.version>
<dify.version>1.0.7</dify.version>
<dify.version>1.2.6</dify.version>
<!-- gRPC 版本 - 解决 Milvus SDK 依赖冲突 -->
<grpc.version>1.62.2</grpc.version>
<!-- Apache Commons Compress - 用于POI处理ZIP格式 -->

View File

@@ -18,7 +18,8 @@ public enum ChatModeType {
PPIO("ppio", "ppio"),
CUSTOM_API("custom_api", "自定义API"),
MINIMAX("minimax", "MiniMax"),
XIAOMI("xiaomi", "小米MiMo");
XIAOMI("xiaomi", "小米MiMo"),
DIFY("dify", "Dify平台");
private final String code;
private final String description;

View File

@@ -47,6 +47,7 @@ import org.ruoyi.common.sse.core.SseEmitterManager;
import org.ruoyi.common.sse.utils.SseMessageUtils;
import org.ruoyi.domain.bo.vector.QueryVectorBo;
import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo;
import org.ruoyi.enums.ChatModeType;
import org.ruoyi.factory.ChatServiceFactory;
import org.ruoyi.mcp.service.core.ToolProviderFactory;
import org.ruoyi.observability.*;
@@ -97,6 +98,8 @@ public class ChatServiceFacade implements IChatService {
private final ToolProviderFactory toolProviderFactory;
private final org.ruoyi.service.chat.impl.provider.DifyWorkflowService difyWorkflowService;
/**
* 内存实例缓存,避免同一会话重复创建
* Key: sessionId, Value: MessageWindowChatMemory实例
@@ -163,6 +166,14 @@ public class ChatServiceFacade implements IChatService {
* @return 如果需要提前返回则返回SseEmitter否则返回null
*/
private SseEmitter handleSpecialChatModes(ChatRequest chatRequest) {
// 处理 Dify 工作流对话
if (chatRequest.getEnableWorkFlow()
&& chatRequest.getChatModelVo() != null
&& ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode())) {
log.info("处理Dify工作流对话,会话: {}", chatRequest.getSessionId());
return difyWorkflowService.streaming(chatRequest.getChatModelVo(), chatRequest);
}
// 处理工作流对话
if (chatRequest.getEnableWorkFlow()) {
log.info("处理工作流对话,会话: {}", chatRequest.getSessionId());
@@ -430,8 +441,12 @@ public class ChatServiceFacade implements IChatService {
}
}
// Dify 自带 RAG 知识库检索,跳过本地向量库查询
boolean isDifyProvider = chatRequest.getChatModelVo() != null
&& ChatModeType.DIFY.getCode().equals(chatRequest.getChatModelVo().getProviderCode());
// 从向量库查询相关历史消息(知识库内容作为上下文)
if (chatRequest.getKnowledgeId() != null) {
if (chatRequest.getKnowledgeId() != null && !isDifyProvider) {
// 查询知识库信息
KnowledgeInfoVo knowledgeInfoVo = knowledgeInfoService.queryById(Long.valueOf(chatRequest.getKnowledgeId()));
if (knowledgeInfoVo == null) {

View File

@@ -0,0 +1,43 @@
package org.ruoyi.service.chat.impl.provider;
import dev.langchain4j.model.chat.ChatModel;
import dev.langchain4j.model.chat.StreamingChatModel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
import org.ruoyi.enums.ChatModeType;
import org.ruoyi.service.chat.AbstractChatService;
import org.ruoyi.service.chat.impl.provider.model.DifyStreamingChatModel;
import org.springframework.stereotype.Service;
/**
* Dify 平台对话服务
* <p>
* 通过 dify-java-client 接入 Dify 的对话型应用 (Chat App) 和
* 工作流编排对话应用 (Chatflow App),支持流式 SSE 响应。
*
* @author better
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DifyChatServiceImpl implements AbstractChatService {
private final DifyConversationService difyConversationService;
@Override
public StreamingChatModel buildStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest) {
return new DifyStreamingChatModel(chatModelVo, chatRequest, difyConversationService);
}
@Override
public ChatModel buildChatModel(ChatModelVo chatModelVo) {
throw new UnsupportedOperationException("Dify 不支持同步 ChatModel请使用流式模式");
}
@Override
public String getProviderName() {
return ChatModeType.DIFY.getCode();
}
}

View File

@@ -0,0 +1,35 @@
package org.ruoyi.service.chat.impl.provider;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
/**
* Dify 会话映射管理
* <p>
* 维护 ruoyi sessionId 与 Dify conversation_id 的映射关系,
* 确保多轮对话上下文连续。
*
* @author better
*/
@Service
public class DifyConversationService {
private final ConcurrentHashMap<Long, String> conversationMap = new ConcurrentHashMap<>();
public String getConversationId(Long sessionId) {
return conversationMap.get(sessionId);
}
public void saveMapping(Long sessionId, String difyConversationId) {
if (sessionId != null && difyConversationId != null) {
conversationMap.put(sessionId, difyConversationId);
}
}
public void clearMapping(Long sessionId) {
if (sessionId != null) {
conversationMap.remove(sessionId);
}
}
}

View File

@@ -0,0 +1,137 @@
package org.ruoyi.service.chat.impl.provider;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.github.imfangs.dify.client.DifyClientFactory;
import io.github.imfangs.dify.client.DifyWorkflowClient;
import io.github.imfangs.dify.client.enums.ResponseMode;
import io.github.imfangs.dify.client.event.ErrorEvent;
import io.github.imfangs.dify.client.event.WorkflowFinishedEvent;
import io.github.imfangs.dify.client.event.WorkflowTextChunkEvent;
import io.github.imfangs.dify.client.callback.WorkflowStreamCallback;
import io.github.imfangs.dify.client.model.workflow.WorkflowRunRequest;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
import org.ruoyi.common.chat.domain.dto.request.WorkFlowRunner;
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
import org.ruoyi.common.sse.utils.SseMessageUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* Dify 工作流执行服务
* <p>
* 通过 DifyWorkflowClient 调用 Dify 平台上部署的工作流应用,
* 并将节点事件通过 SSE 实时推送给前端。
*
* @author better
*/
@Service
@Slf4j
public class DifyWorkflowService {
/**
* 流式执行 Dify 工作流
*
* @param chatModelVo 模型配置apiHost= Dify 地址, apiKey= Dify 密钥)
* @param chatRequest 聊天请求
* @return SSE emitter
*/
public SseEmitter streaming(ChatModelVo chatModelVo, ChatRequest chatRequest) {
Long userId = chatRequest.getUserId();
String tokenValue = chatRequest.getTokenValue();
SseEmitter emitter = chatRequest.getEmitter();
// 构建 Dify 工作流请求参数
Map<String, Object> inputs = convertInputs(chatRequest.getWorkFlowRunner());
WorkflowRunRequest request = WorkflowRunRequest.builder()
.inputs(inputs)
.responseMode(ResponseMode.STREAMING)
.user(String.valueOf(userId))
.build();
DifyWorkflowClient client = DifyClientFactory.createWorkflowClient(
normalizeBaseUrl(chatModelVo.getApiHost()),
chatModelVo.getApiKey());
// 异步执行,避免阻塞请求线程
CompletableFuture.runAsync(() -> {
try {
client.runWorkflowStream(request, new WorkflowStreamCallback() {
@Override
public void onWorkflowTextChunk(WorkflowTextChunkEvent event) {
String text = event.getData() != null ? event.getData().getText() : null;
if (text != null) {
SseMessageUtils.sendContent(userId, text);
}
}
@Override
public void onWorkflowFinished(WorkflowFinishedEvent event) {
// 将最终输出作为内容发送
if (event.getData() != null && event.getData().getOutputs() != null) {
Map<String, Object> outputs = event.getData().getOutputs();
for (Map.Entry<String, Object> entry : outputs.entrySet()) {
SseMessageUtils.sendContent(userId,
entry.getKey() + ": " + entry.getValue() + "\n");
}
}
SseMessageUtils.sendDone(userId);
SseMessageUtils.completeConnection(userId, tokenValue);
}
@Override
public void onError(ErrorEvent event) {
SseMessageUtils.sendError(userId, event.getMessage());
}
@Override
public void onException(Throwable throwable) {
log.error("Dify 工作流执行异常", throwable);
SseMessageUtils.sendError(userId, throwable.getMessage());
SseMessageUtils.completeConnection(userId, tokenValue);
}
});
} catch (Exception e) {
log.error("Dify 工作流执行失败", e);
SseMessageUtils.sendError(userId, e.getMessage());
SseMessageUtils.completeConnection(userId, tokenValue);
}
});
return emitter;
}
/**
* 将 WorkFlowRunner.inputs (List<ObjectNode>) 转换为 Dify 所需的 Map
*/
private Map<String, Object> convertInputs(WorkFlowRunner runner) {
Map<String, Object> result = new HashMap<>();
if (runner == null || runner.getInputs() == null) {
return result;
}
for (ObjectNode node : runner.getInputs()) {
Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
result.put(field.getKey(), field.getValue().asText());
}
}
return result;
}
private String normalizeBaseUrl(String baseUrl) {
if (baseUrl == null || baseUrl.isBlank()) {
throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空");
}
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
}
}

View File

@@ -0,0 +1,172 @@
package org.ruoyi.service.chat.impl.provider.model;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.data.message.ChatMessage;
import dev.langchain4j.data.message.UserMessage;
import dev.langchain4j.model.chat.StreamingChatModel;
import dev.langchain4j.model.chat.response.ChatResponse;
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
import io.github.imfangs.dify.client.DifyChatClient;
import io.github.imfangs.dify.client.DifyClientFactory;
import io.github.imfangs.dify.client.enums.ResponseMode;
import io.github.imfangs.dify.client.event.ErrorEvent;
import io.github.imfangs.dify.client.event.MessageEndEvent;
import io.github.imfangs.dify.client.event.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
import org.ruoyi.common.chat.domain.vo.chat.ChatModelVo;
import org.ruoyi.service.chat.impl.provider.DifyConversationService;
import java.util.List;
/**
* Dify 流式聊天模型适配器
* <p>
* 将 Dify 的回调式流式响应适配为 langchain4j 的 StreamingChatModel 接口,
* 使 ChatServiceFacade 可以像其他 provider 一样统一调用。
*
* @author better
*/
@Slf4j
public class DifyStreamingChatModel implements StreamingChatModel {
private final ChatModelVo chatModelVo;
private final ChatRequest chatRequest;
private final DifyConversationService conversationService;
public DifyStreamingChatModel(ChatModelVo chatModelVo, ChatRequest chatRequest,
DifyConversationService conversationService) {
this.chatModelVo = chatModelVo;
this.chatRequest = chatRequest;
this.conversationService = conversationService;
}
@Override
public void chat(List<ChatMessage> messages, StreamingChatResponseHandler handler) {
// 1. 从 langchain4j 消息列表中提取最后一条用户消息作为 query
String query = extractUserQuery(messages);
// 2. 获取 Dify conversation_id多轮对话连续性
String conversationId = null;
if (chatRequest.getSessionId() != null) {
conversationId = conversationService.getConversationId(chatRequest.getSessionId());
}
// 3. 构建 Dify 请求
io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder()
.query(query)
.user(String.valueOf(chatRequest.getUserId()))
.responseMode(ResponseMode.STREAMING)
.conversationId(conversationId)
.autoGenerateName(true)
.build();
// 4. 创建 Dify 客户端并发送流式请求
try {
DifyChatClient client = DifyClientFactory.createChatClient(
normalizeBaseUrl(chatModelVo.getApiHost()),
chatModelVo.getApiKey());
client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler));
} catch (Exception e) {
log.error("Dify 流式对话调用失败", e);
handler.onError(e);
}
}
@Override
public void chat(String userMessage, StreamingChatResponseHandler handler) {
io.github.imfangs.dify.client.model.chat.ChatMessage difyMessage = io.github.imfangs.dify.client.model.chat.ChatMessage.builder()
.query(userMessage)
.user(String.valueOf(chatRequest.getUserId()))
.responseMode(ResponseMode.STREAMING)
.conversationId(chatRequest.getSessionId() != null
? conversationService.getConversationId(chatRequest.getSessionId()) : null)
.autoGenerateName(true)
.build();
try {
DifyChatClient client = DifyClientFactory.createChatClient(
normalizeBaseUrl(chatModelVo.getApiHost()),
chatModelVo.getApiKey());
client.sendChatMessageStream(difyMessage, new DifyChatStreamAdapter(handler));
} catch (Exception e) {
log.error("Dify 流式对话调用失败", e);
handler.onError(e);
}
}
/**
* 从 langchain4j 消息列表中提取最后一条用户消息文本
*/
private String extractUserQuery(List<ChatMessage> messages) {
for (int i = messages.size() - 1; i >= 0; i--) {
ChatMessage msg = messages.get(i);
if (msg instanceof UserMessage) {
return ((UserMessage) msg).singleText();
}
}
return "";
}
private String normalizeBaseUrl(String baseUrl) {
if (baseUrl == null || baseUrl.isBlank()) {
throw new IllegalArgumentException("Dify API 地址(apiHost)不能为空");
}
return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
}
/**
* Dify 回调适配器
* 将 Dify ChatStreamCallback 事件转发给 langchain4j StreamingChatResponseHandler
*/
private class DifyChatStreamAdapter implements io.github.imfangs.dify.client.callback.ChatStreamCallback {
private final StreamingChatResponseHandler handler;
private final StringBuilder fullResponse = new StringBuilder();
DifyChatStreamAdapter(StreamingChatResponseHandler handler) {
this.handler = handler;
}
@Override
public void onMessage(MessageEvent event) {
String answer = event.getAnswer();
if (answer != null) {
fullResponse.append(answer);
handler.onPartialResponse(answer);
}
// 保存 Dify conversation_id 以维持多轮对话
if (event.getConversationId() != null && chatRequest.getSessionId() != null) {
conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId());
}
}
@Override
public void onMessageEnd(MessageEndEvent event) {
// 保存 conversation_id
if (event.getConversationId() != null && chatRequest.getSessionId() != null) {
conversationService.saveMapping(chatRequest.getSessionId(), event.getConversationId());
}
// 构建完整的 ChatResponse 交给上层处理
AiMessage aiMessage = new AiMessage(fullResponse.toString());
ChatResponse response = ChatResponse.builder()
.aiMessage(aiMessage)
.id(event.getMessageId())
.build();
handler.onCompleteResponse(response);
}
@Override
public void onError(ErrorEvent event) {
handler.onError(new RuntimeException(event.getMessage()));
}
@Override
public void onException(Throwable throwable) {
handler.onError(throwable);
}
}
}