mirror of
https://gitcode.com/ageerle/ruoyi-ai.git
synced 2026-04-15 12:53:42 +00:00
feat: 发布3.0版本,新增文档处理能力和演示模式
- 升级langchain4j版本至1.13.0 - 新增docx/pdf/xlsx文档处理技能模块 - 添加演示模式配置和切面拦截 - 优化聊天服务和可观测性监听器 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -103,6 +103,21 @@
|
||||
<version>${langchain4j.community.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- LangChain4j Skills - 技能模块 -->
|
||||
<dependency>
|
||||
<groupId>dev.langchain4j</groupId>
|
||||
<artifactId>langchain4j-skills</artifactId>
|
||||
<version>${langchain4j.community.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>dev.langchain4j</groupId>
|
||||
<artifactId>langchain4j-experimental-skills-shell</artifactId>
|
||||
<version>${langchain4j.community.version}</version>
|
||||
</dependency>
|
||||
|
||||
|
||||
<dependency>
|
||||
<groupId>org.testcontainers</groupId>
|
||||
<artifactId>weaviate</artifactId>
|
||||
@@ -152,6 +167,13 @@
|
||||
<artifactId>mysql-connector-j</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- 测试依赖 -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package org.ruoyi.agent;
|
||||
|
||||
import dev.langchain4j.agentic.Agent;
|
||||
import dev.langchain4j.service.SystemMessage;
|
||||
import dev.langchain4j.service.UserMessage;
|
||||
import dev.langchain4j.service.V;
|
||||
|
||||
/**
|
||||
* 技能管理 Agent
|
||||
* 管理 docx、pdf、xlsx 等文档处理技能
|
||||
*
|
||||
* <p>可用技能:
|
||||
* <ul>
|
||||
* <li>docx - Word 文档创建、编辑和分析</li>
|
||||
* <li>pdf - PDF 文档处理、提取文本和表格</li>
|
||||
* <li>xlsx - Excel 电子表格创建、编辑和分析</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2026/04/10
|
||||
*/
|
||||
public interface SkillsAgent {
|
||||
|
||||
@SystemMessage("""
|
||||
你是一个文档处理技能助手,能够使用 activate_skill 工具激活特定技能来处理各种文档任务。
|
||||
使用指南:
|
||||
1. 根据用户请求判断需要哪个技能
|
||||
2. 使用 activate_skill("skill-name") 激活对应技能
|
||||
3. 按照技能指令执行任务
|
||||
4. 如果需要参考文件,使用 read_skill_resource 读取
|
||||
""")
|
||||
@UserMessage("{{query}}")
|
||||
@Agent("文档处理技能助手,支持 Word、PDF、Excel 文档的创建、编辑和分析")
|
||||
String process(@V("query") String query);
|
||||
}
|
||||
@@ -6,33 +6,26 @@ import dev.langchain4j.service.UserMessage;
|
||||
import dev.langchain4j.service.V;
|
||||
|
||||
/**
|
||||
* Web Search Agent
|
||||
* A web search assistant that answers natural language questions by searching the internet
|
||||
* and returning relevant information from web pages.
|
||||
* 浏览器工具 Agent
|
||||
* 能够操作浏览器相关工具:网络搜索、网页抓取、浏览器自动化等
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/04/10
|
||||
*/
|
||||
public interface WebSearchAgent {
|
||||
|
||||
@SystemMessage("""
|
||||
You are a web search assistant. Answer questions by searching and retrieving web content.
|
||||
你是一个系统工具助手,能够使用工具来帮助用户获取信息和操作浏览器。
|
||||
|
||||
Available tools:
|
||||
1. bing_search: Search the internet with keywords
|
||||
- query (required): search keywords
|
||||
- count (optional): number of results, default 10, max 50
|
||||
- offset (optional): pagination offset, default 0
|
||||
Returns: title, link, and summary for each result
|
||||
|
||||
2. crawl_webpage: Extract text content from a web page
|
||||
- url (required): web page URL
|
||||
Returns: cleaned page title and main content
|
||||
|
||||
Instructions:
|
||||
- Always cite sources in your answers
|
||||
- Only use the two tools listed above
|
||||
【最重要原则】
|
||||
除非用户明确要求使用浏览器查询信息,否则不要主动调用任何搜索或浏览器工具。
|
||||
使用指南:
|
||||
- 搜索信息时使用 bing_search
|
||||
- 需要详细网页内容时使用 crawl_webpage
|
||||
- 需要交互操作(登录、点击、填写表单)时使用 Playwright 工具
|
||||
- 在回答中注明信息来源
|
||||
""")
|
||||
@UserMessage("""
|
||||
Answer the following question by searching the web: {{query}}
|
||||
""")
|
||||
@Agent("Web search assistant using Bing search and web scraping to find and retrieve information")
|
||||
@UserMessage("{{query}}")
|
||||
@Agent("浏览器工具助手,支持网络搜索、网页抓取和浏览器自动化操作")
|
||||
String search(@V("query") String query);
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.validation.Valid;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.chat.domain.dto.request.AgentChatRequest;
|
||||
import org.ruoyi.common.chat.domain.dto.request.ChatRequest;
|
||||
import org.ruoyi.service.chat.impl.ChatServiceFacade;
|
||||
import org.springframework.stereotype.Controller;
|
||||
|
||||
@@ -125,21 +125,21 @@ public class MyAgentListener implements dev.langchain4j.agentic.observability.Ag
|
||||
}
|
||||
|
||||
// ==================== 工具执行生命周期 ====================
|
||||
|
||||
@Override
|
||||
public void beforeToolExecution(BeforeToolExecution beforeToolExecution) {
|
||||
var toolRequest = beforeToolExecution.request();
|
||||
log.info("【工具执行前】工具请求ID: {}", toolRequest.id());
|
||||
log.info("【工具执行前】工具名称: {}", toolRequest.name());
|
||||
log.info("【工具执行前】工具参数: {}", toolRequest.arguments());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterToolExecution(ToolExecution toolExecution) {
|
||||
var toolRequest = toolExecution.request();
|
||||
log.info("【工具执行后】工具请求ID: {}", toolRequest.id());
|
||||
log.info("【工具执行后】工具名称: {}", toolRequest.name());
|
||||
log.info("【工具执行后】工具执行结果: {}", toolExecution.result());
|
||||
log.info("【工具执行后】工具执行是否失败: {}", toolExecution.hasFailed());
|
||||
}
|
||||
//
|
||||
// @Override
|
||||
// public void beforeToolExecution(BeforeToolExecution beforeToolExecution) {
|
||||
// var toolRequest = beforeToolExecution.request();
|
||||
// log.info("【工具执行前】工具请求ID: {}", toolRequest.id());
|
||||
// log.info("【工具执行前】工具名称: {}", toolRequest.name());
|
||||
// log.info("【工具执行前】工具参数: {}", toolRequest.arguments());
|
||||
// }
|
||||
//
|
||||
// @Override
|
||||
// public void afterToolExecution(ToolExecution toolExecution) {
|
||||
// var toolRequest = toolExecution.request();
|
||||
// log.info("【工具执行后】工具请求ID: {}", toolRequest.id());
|
||||
// log.info("【工具执行后】工具名称: {}", toolRequest.name());
|
||||
// log.info("【工具执行后】工具执行结果: {}", toolExecution.result());
|
||||
// log.info("【工具执行后】工具执行是否失败: {}", toolExecution.hasFailed());
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -1,23 +1,37 @@
|
||||
package org.ruoyi.observability;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import dev.langchain4j.invocation.InvocationContext;
|
||||
import dev.langchain4j.mcp.client.McpCallContext;
|
||||
import dev.langchain4j.mcp.client.McpClientListener;
|
||||
import dev.langchain4j.mcp.client.McpGetPromptResult;
|
||||
import dev.langchain4j.mcp.client.McpReadResourceResult;
|
||||
import dev.langchain4j.mcp.protocol.McpClientMessage;
|
||||
import dev.langchain4j.mcp.protocol.*;
|
||||
import dev.langchain4j.service.tool.ToolExecutionResult;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.common.sse.dto.SseEventDto;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 自定义的 McpClientListener 的监听器。
|
||||
* 监听 MCP 客户端相关的所有可观测性事件,包括:
|
||||
* MCP 客户端监听器
|
||||
* <p>
|
||||
* 监听 MCP 工具执行事件,并通过 SSE 推送到前端
|
||||
* <p>
|
||||
* <b>SSE 推送格式:</b>
|
||||
* <pre>
|
||||
* {
|
||||
* "event": "mcp",
|
||||
* "content": "{\"name\":\"工具名称\",\"status\":\"pending|success|error\",\"result\":\"执行结果\"}"
|
||||
* }
|
||||
* </pre>
|
||||
* <b>前端区分方式:</b>
|
||||
* <ul>
|
||||
* <li>MCP 工具执行的开始/成功/错误事件</li>
|
||||
* <li>MCP 资源读取的开始/成功/错误事件</li>
|
||||
* <li>MCP 提示词获取的开始/成功/错误事件</li>
|
||||
* <li>对话内容:event="content"</li>
|
||||
* <li>MCP 事件:event="mcp"</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author evo
|
||||
@@ -25,112 +39,135 @@ import java.util.Map;
|
||||
@Slf4j
|
||||
public class MyMcpClientListener implements McpClientListener {
|
||||
|
||||
// ==================== 工具执行 ====================
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
|
||||
private final Long userId;
|
||||
|
||||
public MyMcpClientListener(Long userId) {
|
||||
this.userId = userId;
|
||||
}
|
||||
|
||||
public MyMcpClientListener() {
|
||||
this.userId = null;
|
||||
}
|
||||
|
||||
// ==================== 工具执行 ====================
|
||||
@Override
|
||||
public void beforeExecuteTool(McpCallContext context) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
McpClientRequest message = (McpClientRequest) context.message();
|
||||
McpClientParams params = message.getParams();
|
||||
if (params instanceof McpCallToolParams callToolParams) {
|
||||
String name = callToolParams.getName();
|
||||
log.info("工具调用之前:{}",name);
|
||||
pushMcpEvent(name, "pending", null);
|
||||
}
|
||||
|
||||
log.info("【MCP工具执行前】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP工具执行前】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP工具执行前】MCP方法: {}", message.method);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterExecuteTool(McpCallContext context, ToolExecutionResult result, Map<String, Object> rawResult) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.info("【MCP工具执行后】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP工具执行后】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP工具执行后】MCP方法: {}", message.method);
|
||||
log.info("【MCP工具执行后】工具执行结果: {}", result);
|
||||
log.info("【MCP工具执行后】原始结果: {}", rawResult);
|
||||
McpClientRequest message = (McpClientRequest) context.message();
|
||||
McpClientParams params = message.getParams();
|
||||
if (params instanceof McpCallToolParams callToolParams) {
|
||||
String name = callToolParams.getName();
|
||||
String resultText = result != null ? result.toString() : "";
|
||||
log.info("工具调用之后:{},返回结果{}",name,result);
|
||||
pushMcpEvent(name, "success", truncate(resultText, 500));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onExecuteToolError(McpCallContext context, Throwable error) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.error("【MCP工具执行错误】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.error("【MCP工具执行错误】MCP消息ID: {}", message.getId());
|
||||
log.error("【MCP工具执行错误】MCP方法: {}", message.method);
|
||||
log.error("【MCP工具执行错误】错误类型: {}", error.getClass().getName());
|
||||
log.error("【MCP工具执行错误】错误信息: {}", error.getMessage(), error);
|
||||
String toolName = getMethodName(context);
|
||||
log.error("【MCP工具执行错误】工具: {}, 错误: {}", toolName, error.getMessage());
|
||||
pushMcpEvent(toolName, "error", error.getMessage());
|
||||
}
|
||||
|
||||
// ==================== 资源读取 ====================
|
||||
|
||||
@Override
|
||||
public void beforeResourceGet(McpCallContext context) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.info("【MCP资源读取前】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP资源读取前】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP资源读取前】MCP方法: {}", message.method);
|
||||
String name = getMethodName(context);
|
||||
log.info("【MCP资源读取前】资源: {}", name);
|
||||
pushMcpEvent(name, "pending", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterResourceGet(McpCallContext context, McpReadResourceResult result, Map<String, Object> rawResult) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.info("【MCP资源读取后】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP资源读取后】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP资源读取后】MCP方法: {}", message.method);
|
||||
log.info("【MCP资源读取后】资源内容数量: {}", result.contents() != null ? result.contents().size() : 0);
|
||||
log.info("【MCP资源读取后】原始结果: {}", rawResult);
|
||||
String name = getMethodName(context);
|
||||
int count = result.contents() != null ? result.contents().size() : 0;
|
||||
log.info("【MCP资源读取后】资源: {}, 数量: {}", name, count);
|
||||
pushMcpEvent(name, "success", "读取 " + count + " 条资源");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResourceGetError(McpCallContext context, Throwable error) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.error("【MCP资源读取错误】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.error("【MCP资源读取错误】MCP消息ID: {}", message.getId());
|
||||
log.error("【MCP资源读取错误】MCP方法: {}", message.method);
|
||||
log.error("【MCP资源读取错误】错误类型: {}", error.getClass().getName());
|
||||
log.error("【MCP资源读取错误】错误信息: {}", error.getMessage(), error);
|
||||
String name = getMethodName(context);
|
||||
log.error("【MCP资源读取错误】资源: {}, 错误: {}", name, error.getMessage());
|
||||
pushMcpEvent(name, "error", error.getMessage());
|
||||
}
|
||||
|
||||
// ==================== 提示词获取 ====================
|
||||
|
||||
@Override
|
||||
public void beforePromptGet(McpCallContext context) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.info("【MCP提示词获取前】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP提示词获取前】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP提示词获取前】MCP方法: {}", message.method);
|
||||
String name = getMethodName(context);
|
||||
log.info("【MCP提示词获取前】提示词: {}", name);
|
||||
pushMcpEvent(name, "pending", null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterPromptGet(McpCallContext context, McpGetPromptResult result, Map<String, Object> rawResult) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
|
||||
log.info("【MCP提示词获取后】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.info("【MCP提示词获取后】MCP消息ID: {}", message.getId());
|
||||
log.info("【MCP提示词获取后】MCP方法: {}", message.method);
|
||||
log.info("【MCP提示词获取后】提示词描述: {}", result.description());
|
||||
log.info("【MCP提示词获取后】提示词消息数量: {}", result.messages() != null ? result.messages().size() : 0);
|
||||
log.info("【MCP提示词获取后】原始结果: {}", rawResult);
|
||||
String name = getMethodName(context);
|
||||
int count = result.messages() != null ? result.messages().size() : 0;
|
||||
log.info("【MCP提示词获取后】提示词: {}, 消息数: {}", name, count);
|
||||
pushMcpEvent(name, "success", "获取 " + count + " 条消息");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPromptGetError(McpCallContext context, Throwable error) {
|
||||
InvocationContext invocationContext = context.invocationContext();
|
||||
McpClientMessage message = context.message();
|
||||
String name = getMethodName(context);
|
||||
log.error("【MCP提示词获取错误】提示词: {}, 错误: {}", name, error.getMessage());
|
||||
pushMcpEvent(name, "error", error.getMessage());
|
||||
}
|
||||
|
||||
log.error("【MCP提示词获取错误】调用唯一标识符: {}", invocationContext.invocationId());
|
||||
log.error("【MCP提示词获取错误】MCP消息ID: {}", message.getId());
|
||||
log.error("【MCP提示词获取错误】MCP方法: {}", message.method);
|
||||
log.error("【MCP提示词获取错误】错误类型: {}", error.getClass().getName());
|
||||
log.error("【MCP提示词获取错误】错误信息: {}", error.getMessage(), error);
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
private String getMethodName(McpCallContext context) {
|
||||
try {
|
||||
McpClientMessage message = context.message();
|
||||
return message.method != null ? message.method.toString() : "unknown";
|
||||
} catch (Exception e) {
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 推送 MCP 事件到前端
|
||||
*/
|
||||
private void pushMcpEvent(String name, String status, String result) {
|
||||
if (userId == null) {
|
||||
log.warn("userId 为空,无法推送 MCP 事件");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Map<String, Object> content = new HashMap<>();
|
||||
content.put("name", name);
|
||||
content.put("status", status);
|
||||
content.put("result", result);
|
||||
|
||||
String json = OBJECT_MAPPER.writeValueAsString(content);
|
||||
SseMessageUtils.sendEvent(userId, SseEventDto.builder()
|
||||
.event("mcp")
|
||||
.content(json)
|
||||
.build());
|
||||
} catch (JsonProcessingException e) {
|
||||
log.error("序列化 MCP 事件失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private String truncate(String str, int maxLen) {
|
||||
if (str == null) return null;
|
||||
return str.length() > maxLen ? str.substring(0, maxLen) + "..." : str;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
package org.ruoyi.observability;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* 跨线程事件总线
|
||||
*
|
||||
* 写入端(异步线程):StreamingOutputWrapper / SupervisorStreamListener
|
||||
* 读取端(SSE 线程):ChatServiceFacade.drain
|
||||
*
|
||||
* 调用链路:
|
||||
* SSE请求 -> 创建 OutputChannel
|
||||
* -> Supervisor.invoke() [同步阻塞调用子Agent]
|
||||
* ├── SupervisorStreamListener -> channel.send()
|
||||
* └── searchAgent.search()
|
||||
* └── StreamingOutputWrapper -> channel.send() [每个token]
|
||||
* -> channel.complete()
|
||||
* drain线程 -> channel.drain() -> SSE实时推送
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/04/10
|
||||
*/
|
||||
public class OutputChannel {
|
||||
|
||||
private static final String DONE = "__DONE__";
|
||||
private static final Map<String, OutputChannel> REGISTRY = new ConcurrentHashMap<>();
|
||||
|
||||
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(4096);
|
||||
private final AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
private final CountDownLatch completed = new CountDownLatch(1);
|
||||
|
||||
/**
|
||||
* 创建并注册到全局注册表
|
||||
*/
|
||||
public static OutputChannel create(String requestId) {
|
||||
OutputChannel ch = new OutputChannel();
|
||||
REGISTRY.put(requestId, ch);
|
||||
return ch;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从全局注册表移除
|
||||
*/
|
||||
public static void remove(String requestId) {
|
||||
REGISTRY.remove(requestId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 从全局注册表获取
|
||||
*/
|
||||
public static OutputChannel get(String requestId) {
|
||||
return REGISTRY.get(requestId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入:线程安全,非阻塞,队列满时丢弃
|
||||
*/
|
||||
public void send(String text) {
|
||||
if (text == null || text.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (!queue.offer(text, 100, TimeUnit.MILLISECONDS)) {
|
||||
System.err.println("[OutputChannel] 队列满,丢弃消息: " + truncate(text, 100));
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记完成
|
||||
*/
|
||||
public void complete() {
|
||||
queue.offer(DONE);
|
||||
completed.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记错误完成
|
||||
*/
|
||||
public void completeWithError(Throwable t) {
|
||||
error.set(t);
|
||||
queue.offer("\n[错误] 致命错误: " + t.getMessage());
|
||||
queue.offer(DONE);
|
||||
completed.countDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* 读取:阻塞迭代,配合 SSE 使用
|
||||
*/
|
||||
public void drain(Consumer<String> emitter) throws InterruptedException {
|
||||
while (true) {
|
||||
String msg = queue.poll(200, TimeUnit.MILLISECONDS);
|
||||
if (msg != null) {
|
||||
if (DONE.equals(msg)) {
|
||||
break;
|
||||
}
|
||||
emitter.accept(msg);
|
||||
} else {
|
||||
if (completed.getCount() == 0 && queue.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Throwable t = error.get();
|
||||
if (t != null && !(t instanceof InterruptedException)) {
|
||||
throw new RuntimeException("Agent 执行出错", t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否已完成
|
||||
*/
|
||||
public boolean isCompleted() {
|
||||
return completed.getCount() == 0;
|
||||
}
|
||||
|
||||
private String truncate(String s, int maxLen) {
|
||||
if (s == null) {
|
||||
return "null";
|
||||
}
|
||||
return s.length() > maxLen ? s.substring(0, maxLen) + "..." : s;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,213 @@
|
||||
package org.ruoyi.observability;
|
||||
|
||||
import dev.langchain4j.model.chat.ChatModel;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.listener.ChatModelListener;
|
||||
import dev.langchain4j.model.chat.request.ChatRequest;
|
||||
import dev.langchain4j.model.chat.request.ChatRequestParameters;
|
||||
import dev.langchain4j.model.chat.response.ChatResponse;
|
||||
import dev.langchain4j.model.chat.response.PartialThinking;
|
||||
import dev.langchain4j.model.chat.response.PartialThinkingContext;
|
||||
import dev.langchain4j.model.chat.response.PartialToolCall;
|
||||
import dev.langchain4j.model.chat.response.PartialToolCallContext;
|
||||
import dev.langchain4j.model.chat.response.PartialResponse;
|
||||
import dev.langchain4j.model.chat.response.PartialResponseContext;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import dev.langchain4j.model.chat.Capability;
|
||||
import dev.langchain4j.model.ModelProvider;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* 包装 StreamingChatModel,同时实现 ChatModel 接口。
|
||||
*
|
||||
* 当 AI Service 方法返回 String 时,LangChain4j 使用 ChatModel.chat()
|
||||
* 当返回 TokenStream 时,使用 StreamingChatModel.chat()
|
||||
*
|
||||
* 此包装器同时实现两个接口,将同步调用转换为流式调用并收集结果,
|
||||
* 同时拦截每个 token 推送到 OutputChannel。
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/04/10
|
||||
*/
|
||||
@Slf4j
|
||||
public class StreamingOutputWrapper implements StreamingChatModel, ChatModel {
|
||||
|
||||
private final StreamingChatModel streamingDelegate;
|
||||
private final OutputChannel channel;
|
||||
|
||||
/**
|
||||
* 包装 StreamingChatModel
|
||||
*/
|
||||
public StreamingOutputWrapper(StreamingChatModel delegate, OutputChannel channel) {
|
||||
this.streamingDelegate = delegate;
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
// ==================== 解决接口默认方法冲突 ====================
|
||||
|
||||
@Override
|
||||
public Set<Capability> supportedCapabilities() {
|
||||
return streamingDelegate.supportedCapabilities();
|
||||
}
|
||||
|
||||
// ==================== ChatModel 接口实现(同步调用) ====================
|
||||
|
||||
@Override
|
||||
public ChatResponse chat(ChatRequest request) {
|
||||
log.info("【StreamingOutputWrapper】chat() 被调用,开始流式处理");
|
||||
// 用于收集完整响应
|
||||
AtomicReference<ChatResponse> responseRef = new AtomicReference<>();
|
||||
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||
|
||||
// 调用流式模型,拦截每个 token
|
||||
streamingDelegate.chat(request, new StreamingChatResponseHandler() {
|
||||
@Override
|
||||
public void onPartialResponse(String token) {
|
||||
// 推送到 channel
|
||||
channel.send(token);
|
||||
log.debug("【流式Token】{}", token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialResponse(PartialResponse pr, PartialResponseContext ctx) {
|
||||
channel.send(pr.text());
|
||||
log.debug("【流式PartialResponse】{}", pr.text());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialThinking(PartialThinking thinking) {
|
||||
channel.send("[思考] " + thinking.text());
|
||||
log.debug("【流式思考】{}", thinking.text());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialThinking(PartialThinking thinking, PartialThinkingContext ctx) {
|
||||
channel.send("[思考] " + thinking.text());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialToolCall(PartialToolCall toolCall) {
|
||||
// channel.send("[工具参数生成中] " + toolCall);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialToolCall(PartialToolCall toolCall, PartialToolCallContext ctx) {
|
||||
// channel.send("[工具参数生成中] " + toolCall);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse response) {
|
||||
responseRef.set(response);
|
||||
if (response.metadata() != null && response.metadata().tokenUsage() != null) {
|
||||
var usage = response.metadata().tokenUsage();
|
||||
// channel.send("\n[Token统计] input=" + usage.inputTokenCount()
|
||||
// + " output=" + usage.outputTokenCount());
|
||||
}
|
||||
log.info("【StreamingOutputWrapper】流式处理完成");
|
||||
future.complete(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
channel.send("\n[错误] " + error.getMessage());
|
||||
channel.completeWithError(error);
|
||||
future.completeExceptionally(error);
|
||||
log.error("【StreamingOutputWrapper】流式处理出错", error);
|
||||
}
|
||||
});
|
||||
|
||||
// 等待流式完成
|
||||
future.join();
|
||||
|
||||
// 返回收集的响应
|
||||
return responseRef.get();
|
||||
}
|
||||
|
||||
// ==================== StreamingChatModel 接口实现(流式调用) ====================
|
||||
|
||||
@Override
|
||||
public void chat(ChatRequest request, StreamingChatResponseHandler handler) {
|
||||
StreamingChatResponseHandler wrapped = wrapHandler(handler);
|
||||
streamingDelegate.chat(request, wrapped);
|
||||
}
|
||||
|
||||
private StreamingChatResponseHandler wrapHandler(StreamingChatResponseHandler original) {
|
||||
return new StreamingChatResponseHandler() {
|
||||
|
||||
@Override
|
||||
public void onPartialResponse(String token) {
|
||||
channel.send(token);
|
||||
original.onPartialResponse(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialResponse(PartialResponse pr, PartialResponseContext ctx) {
|
||||
channel.send(pr.text());
|
||||
original.onPartialResponse(pr, ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialThinking(PartialThinking thinking) {
|
||||
channel.send("[思考] " + thinking.text());
|
||||
original.onPartialThinking(thinking);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialThinking(PartialThinking thinking, PartialThinkingContext ctx) {
|
||||
channel.send("[思考] " + thinking.text());
|
||||
original.onPartialThinking(thinking, ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialToolCall(PartialToolCall toolCall) {
|
||||
//channel.send("[工具参数生成中] " + toolCall);
|
||||
original.onPartialToolCall(toolCall);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPartialToolCall(PartialToolCall toolCall, PartialToolCallContext ctx) {
|
||||
//channel.send("[工具参数生成中] " + toolCall);
|
||||
original.onPartialToolCall(toolCall, ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse response) {
|
||||
if (response.metadata() != null && response.metadata().tokenUsage() != null) {
|
||||
var usage = response.metadata().tokenUsage();
|
||||
// channel.send("\n[Token统计] input=" + usage.inputTokenCount()
|
||||
// + " output=" + usage.outputTokenCount());
|
||||
}
|
||||
original.onCompleteResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
channel.send("\n[错误] " + error.getMessage());
|
||||
channel.completeWithError(error);
|
||||
original.onError(error);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// ==================== 共用接口方法 ====================
|
||||
|
||||
@Override
|
||||
public ChatRequestParameters defaultRequestParameters() {
|
||||
return streamingDelegate.defaultRequestParameters();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ChatModelListener> listeners() {
|
||||
return streamingDelegate.listeners();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ModelProvider provider() {
|
||||
return streamingDelegate.provider();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,120 @@
|
||||
package org.ruoyi.observability;
|
||||
|
||||
import dev.langchain4j.agentic.observability.AgentInvocationError;
|
||||
import dev.langchain4j.agentic.observability.AgentRequest;
|
||||
import dev.langchain4j.agentic.observability.AgentResponse;
|
||||
import dev.langchain4j.agentic.planner.AgentInstance;
|
||||
import dev.langchain4j.agentic.scope.AgenticScope;
|
||||
import dev.langchain4j.service.tool.BeforeToolExecution;
|
||||
import dev.langchain4j.service.tool.ToolExecution;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Supervisor 流式监听器
|
||||
*
|
||||
* 捕获 Agent 生命周期事件、工具执行前后事件,推送到 OutputChannel
|
||||
* inheritedBySubagents() = true -> 注册在 Supervisor 上,自动继承到所有子 Agent
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/04/10
|
||||
*/
|
||||
@Slf4j
|
||||
public class SupervisorStreamListener implements dev.langchain4j.agentic.observability.AgentListener {
|
||||
|
||||
private final OutputChannel channel;
|
||||
|
||||
/**
|
||||
* 用于在 AgenticScope 中存储 userId 的 key
|
||||
*/
|
||||
public static final String USER_ID_KEY = "userId";
|
||||
|
||||
public SupervisorStreamListener(OutputChannel channel) {
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
// ==================== Agent 调用生命周期 ====================
|
||||
|
||||
@Override
|
||||
public void beforeAgentInvocation(AgentRequest agentRequest) {
|
||||
AgentInstance agent = agentRequest.agent();
|
||||
AgenticScope scope = agentRequest.agenticScope();
|
||||
Map<String, Object> inputs = agentRequest.inputs();
|
||||
// 只记录日志,不推送输入信息(避免干扰流式输出)
|
||||
log.info("[Agent开始] {} 输入: {}", agent.name(), inputs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterAgentInvocation(AgentResponse agentResponse) {
|
||||
AgentInstance agent = agentResponse.agent();
|
||||
Map<String, Object> inputs = agentResponse.inputs();
|
||||
Object output = agentResponse.output();
|
||||
String outputStr = output != null ? output.toString() : "";
|
||||
|
||||
// 只记录日志,不推送输出信息
|
||||
// 流式输出由 StreamingOutputWrapper 处理
|
||||
// 当无子Agent被调用时,由 ChatServiceFacade 用 plannerModel 生成回复
|
||||
log.info("[Agent完成] {} 输出长度: {}", agent.name(), outputStr.length());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAgentInvocationError(AgentInvocationError error) {
|
||||
AgentInstance agent = error.agent();
|
||||
Map<String, Object> inputs = error.inputs();
|
||||
Throwable throwable = error.error();
|
||||
|
||||
channel.send("\n[Agent错误] " + agent.name()
|
||||
+ " 异常: " + throwable.getMessage());
|
||||
log.error("[Agent错误] {} 异常: {}", agent.name(), throwable.getMessage(), throwable);
|
||||
}
|
||||
|
||||
// ==================== AgenticScope 生命周期 ====================
|
||||
|
||||
@Override
|
||||
public void afterAgenticScopeCreated(AgenticScope agenticScope) {
|
||||
log.info("[AgenticScope创建] memoryId: {}", agenticScope.memoryId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeAgenticScopeDestroyed(AgenticScope agenticScope) {
|
||||
log.info("[AgenticScope销毁] memoryId: {}", agenticScope.memoryId());
|
||||
}
|
||||
|
||||
// ==================== 工具执行生命周期 ====================
|
||||
|
||||
// @Override
|
||||
// public void beforeToolExecution(BeforeToolExecution beforeToolExecution) {
|
||||
// var toolRequest = beforeToolExecution.request();
|
||||
//// channel.send("\n[工具即将执行] " + toolRequest.name()
|
||||
//// + " 参数: " + truncate(toolRequest.arguments(), 150));
|
||||
// log.info("[工具即将执行] {} 参数: {}", toolRequest.name(), toolRequest.arguments());
|
||||
// }
|
||||
|
||||
// @Override
|
||||
// public void afterToolExecution(ToolExecution toolExecution) {
|
||||
// var toolRequest = toolExecution.request();
|
||||
//// channel.send("\n[工具执行完成] " + toolRequest.name()
|
||||
//// + " 结果: " + truncate(String.valueOf(toolExecution.result()), 300));
|
||||
// log.info("[工具执行完成] {} 结果: {}", toolRequest.name(), toolExecution.result());
|
||||
// }
|
||||
|
||||
// ==================== 继承机制 ====================
|
||||
|
||||
/**
|
||||
* 返回 true,让此监听器自动继承给所有子 Agent
|
||||
*/
|
||||
@Override
|
||||
public boolean inheritedBySubagents() {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ==================== 辅助方法 ====================
|
||||
|
||||
private String truncate(String s, int maxLen) {
|
||||
if (s == null) {
|
||||
return "null";
|
||||
}
|
||||
return s.length() > maxLen ? s.substring(0, maxLen) + "..." : s;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
package org.ruoyi.service.chat.impl;
|
||||
|
||||
import cn.dev33.satoken.stp.StpUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import dev.langchain4j.agentic.AgenticServices;
|
||||
import dev.langchain4j.agentic.supervisor.SupervisorAgent;
|
||||
import dev.langchain4j.agentic.supervisor.SupervisorResponseStrategy;
|
||||
@@ -14,15 +13,19 @@ import dev.langchain4j.mcp.client.McpClient;
|
||||
import dev.langchain4j.mcp.client.transport.McpTransport;
|
||||
import dev.langchain4j.mcp.client.transport.stdio.StdioMcpTransport;
|
||||
import dev.langchain4j.memory.chat.MessageWindowChatMemory;
|
||||
import dev.langchain4j.model.chat.ChatModel;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.response.ChatResponse;
|
||||
import dev.langchain4j.model.chat.response.StreamingChatResponseHandler;
|
||||
import dev.langchain4j.model.openai.OpenAiChatModel;
|
||||
import dev.langchain4j.service.tool.ToolProvider;
|
||||
import dev.langchain4j.skills.shell.ShellSkills;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.SneakyThrows;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.ruoyi.agent.ChartGenerationAgent;
|
||||
import org.ruoyi.agent.EchartsAgent;
|
||||
import org.ruoyi.agent.SkillsAgent;
|
||||
import org.ruoyi.agent.SqlAgent;
|
||||
import org.ruoyi.agent.WebSearchAgent;
|
||||
import org.ruoyi.agent.tool.ExecuteSqlQueryTool;
|
||||
@@ -38,6 +41,7 @@ import org.ruoyi.common.chat.service.chat.IChatModelService;
|
||||
import org.ruoyi.common.chat.service.chat.IChatService;
|
||||
import org.ruoyi.common.chat.service.workFlow.IWorkFlowStarterService;
|
||||
import org.ruoyi.common.core.utils.ObjectUtils;
|
||||
import org.ruoyi.common.core.utils.StringUtils;
|
||||
import org.ruoyi.common.satoken.utils.LoginHelper;
|
||||
import org.ruoyi.common.sse.core.SseEmitterManager;
|
||||
import org.ruoyi.common.sse.utils.SseMessageUtils;
|
||||
@@ -45,9 +49,7 @@ import org.ruoyi.domain.bo.vector.QueryVectorBo;
|
||||
import org.ruoyi.domain.vo.knowledge.KnowledgeInfoVo;
|
||||
import org.ruoyi.factory.ChatServiceFactory;
|
||||
import org.ruoyi.mcp.service.core.ToolProviderFactory;
|
||||
import org.ruoyi.observability.MyAgentListener;
|
||||
import org.ruoyi.observability.MyChatModelListener;
|
||||
import org.ruoyi.observability.MyMcpClientListener;
|
||||
import org.ruoyi.observability.*;
|
||||
import org.ruoyi.service.chat.AbstractChatService;
|
||||
import org.ruoyi.service.chat.IChatMessageService;
|
||||
import org.ruoyi.service.chat.impl.memory.PersistentChatMemoryStore;
|
||||
@@ -59,6 +61,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -124,10 +127,19 @@ public class ChatServiceFacade implements IChatService {
|
||||
// 2. 构建上下文消息列表
|
||||
List<ChatMessage> contextMessages = buildContextMessages(chatRequest);
|
||||
|
||||
chatRequest.setEmitter(emitter);
|
||||
chatRequest.setUserId(userId);
|
||||
chatRequest.setTokenValue(tokenValue);
|
||||
chatRequest.setChatModelVo(chatModelVo);
|
||||
chatRequest.setContextMessages(contextMessages);
|
||||
|
||||
// 保存用户消息
|
||||
chatMessageService.saveChatMessage(userId, chatRequest.getSessionId(), chatRequest.getContent(), RoleType.USER.getName(), chatRequest.getModel());
|
||||
|
||||
// 3. 处理特殊聊天模式(工作流、人机交互恢复、思考模式)
|
||||
SseEmitter specialResult = handleSpecialChatModes(chatRequest, contextMessages, chatModelVo, emitter, userId, tokenValue);
|
||||
if (specialResult != null) {
|
||||
return specialResult;
|
||||
SseEmitter sseEmitter = handleSpecialChatModes(chatRequest);
|
||||
if (sseEmitter != null) {
|
||||
return sseEmitter;
|
||||
}
|
||||
|
||||
// 4. 路由服务提供商
|
||||
@@ -135,11 +147,8 @@ public class ChatServiceFacade implements IChatService {
|
||||
log.info("路由到服务提供商: {}, 模型: {}", providerCode, chatRequest.getModel());
|
||||
AbstractChatService chatService = chatServiceFactory.getOriginalService(providerCode);
|
||||
|
||||
|
||||
StreamingChatResponseHandler handler = createResponseHandler(userId, tokenValue,chatRequest);
|
||||
|
||||
// 保存用户消息
|
||||
chatMessageService.saveChatMessage(userId, chatRequest.getSessionId(), chatRequest.getContent(), RoleType.USER.getName(), chatRequest.getModel());
|
||||
|
||||
// 5. 发起对话
|
||||
StreamingChatModel streamingChatModel = chatService.buildStreamingChatModel(chatModelVo, chatRequest);
|
||||
@@ -151,16 +160,9 @@ public class ChatServiceFacade implements IChatService {
|
||||
* 处理特殊聊天模式(工作流、人机交互恢复、思考模式)
|
||||
*
|
||||
* @param chatRequest 聊天请求
|
||||
* @param contextMessages 上下文消息列表(可能被修改)
|
||||
* @param chatModelVo 聊天模型配置
|
||||
* @param emitter SSE发射器
|
||||
* @param userId 用户ID
|
||||
* @param tokenValue 会话令牌
|
||||
* @return 如果需要提前返回则返回SseEmitter,否则返回null
|
||||
*/
|
||||
private SseEmitter handleSpecialChatModes(ChatRequest chatRequest, List<ChatMessage> contextMessages,
|
||||
ChatModelVo chatModelVo, SseEmitter emitter,
|
||||
Long userId, String tokenValue) {
|
||||
private SseEmitter handleSpecialChatModes(ChatRequest chatRequest) {
|
||||
// 处理工作流对话
|
||||
if (chatRequest.getEnableWorkFlow()) {
|
||||
log.info("处理工作流对话,会话: {}", chatRequest.getSessionId());
|
||||
@@ -169,7 +171,6 @@ public class ChatServiceFacade implements IChatService {
|
||||
if (ObjectUtils.isEmpty(runner)) {
|
||||
log.warn("工作流参数为空");
|
||||
}
|
||||
|
||||
return workFlowStarterService.streaming(
|
||||
ThreadContext.getCurrentUser(),
|
||||
runner.getUuid(),
|
||||
@@ -181,25 +182,22 @@ public class ChatServiceFacade implements IChatService {
|
||||
// 处理人机交互恢复
|
||||
if (chatRequest.getIsResume()) {
|
||||
log.info("处理人机交互恢复");
|
||||
|
||||
ReSumeRunner reSumeRunner = chatRequest.getReSumeRunner();
|
||||
if (ObjectUtils.isEmpty(reSumeRunner)) {
|
||||
log.warn("人机交互恢复参数为空");
|
||||
return emitter;
|
||||
}
|
||||
|
||||
workFlowStarterService.resumeFlow(
|
||||
reSumeRunner.getRuntimeUuid(),
|
||||
reSumeRunner.getFeedbackContent(),
|
||||
emitter
|
||||
chatRequest.getEmitter()
|
||||
);
|
||||
|
||||
return emitter;
|
||||
}
|
||||
return chatRequest.getEmitter();
|
||||
|
||||
}
|
||||
// 处理思考模式
|
||||
if (chatRequest.getEnableThinking()) {
|
||||
handleThinkingMode(chatRequest, contextMessages, chatModelVo, userId);
|
||||
return handleThinkingMode(chatRequest);
|
||||
}
|
||||
|
||||
return null;
|
||||
@@ -209,80 +207,132 @@ public class ChatServiceFacade implements IChatService {
|
||||
* 处理思考模式
|
||||
*
|
||||
* @param chatRequest 聊天请求
|
||||
* @param contextMessages 上下文消息列表
|
||||
* @param chatModelVo 聊天模型配置
|
||||
* @param userId 用户ID
|
||||
|
||||
*/
|
||||
private void handleThinkingMode(ChatRequest chatRequest, List<ChatMessage> contextMessages,
|
||||
ChatModelVo chatModelVo, Long userId) {
|
||||
// 步骤1: 配置MCP传输层 - 连接到bing-cn-mcp服务器
|
||||
McpTransport transport = new StdioMcpTransport.Builder()
|
||||
private SseEmitter handleThinkingMode(ChatRequest chatRequest) {
|
||||
// 配置监督者模型
|
||||
OpenAiChatModel plannerModel = OpenAiChatModel.builder()
|
||||
.baseUrl(chatRequest.getChatModelVo().getApiHost())
|
||||
.apiKey(chatRequest.getChatModelVo().getApiKey())
|
||||
.modelName(chatRequest.getChatModelVo().getModelName())
|
||||
.build();
|
||||
|
||||
// Bing 搜索 MCP 客户端
|
||||
McpTransport bingTransport = new StdioMcpTransport.Builder()
|
||||
.command(List.of("C:\\Program Files\\nodejs\\npx.cmd", "-y", "bing-cn-mcp"))
|
||||
.logEvents(true)
|
||||
.build();
|
||||
|
||||
McpClient mcpClient = new DefaultMcpClient.Builder()
|
||||
.transport(transport)
|
||||
.listener(new MyMcpClientListener())
|
||||
Long userId = chatRequest.getUserId();
|
||||
McpClient bingMcpClient = new DefaultMcpClient.Builder()
|
||||
.transport(bingTransport)
|
||||
.listener(new MyMcpClientListener(userId))
|
||||
.build();
|
||||
|
||||
ToolProvider toolProvider = McpToolProvider.builder()
|
||||
.mcpClients(List.of(mcpClient))
|
||||
.build();
|
||||
|
||||
// 配置echarts MCP
|
||||
McpTransport transport1 = new StdioMcpTransport.Builder()
|
||||
.command(List.of("C:\\Program Files\\nodejs\\npx.cmd", "-y", "mcp-echarts"))
|
||||
// Playwright MCP 客户端 - 浏览器自动化工具
|
||||
McpTransport playwrightTransport = new StdioMcpTransport.Builder()
|
||||
.command(List.of("C:\\Program Files\\nodejs\\npx.cmd", "-y", "@playwright/mcp@latest"))
|
||||
.logEvents(true)
|
||||
.build();
|
||||
|
||||
McpClient mcpClient1 = new DefaultMcpClient.Builder()
|
||||
.transport(transport1)
|
||||
.listener(new MyMcpClientListener())
|
||||
McpClient playwrightMcpClient = new DefaultMcpClient.Builder()
|
||||
.transport(playwrightTransport)
|
||||
.listener(new MyMcpClientListener(userId))
|
||||
.build();
|
||||
|
||||
ToolProvider toolProvider1 = McpToolProvider.builder()
|
||||
.mcpClients(List.of(mcpClient1))
|
||||
// Filesystem MCP 客户端 - 文件管理工具
|
||||
// 允许 AI 读取、写入、搜索文件(基于当前项目根目录)
|
||||
String userDir = System.getProperty("user.dir");
|
||||
McpTransport filesystemTransport = new StdioMcpTransport.Builder()
|
||||
.command(List.of("C:\\Program Files\\nodejs\\npx.cmd", "-y",
|
||||
"@modelcontextprotocol/server-filesystem", userDir))
|
||||
.logEvents(true)
|
||||
|
||||
.build();
|
||||
|
||||
// 配置模型
|
||||
OpenAiChatModel plannerModel = OpenAiChatModel.builder()
|
||||
.baseUrl(chatModelVo.getApiHost())
|
||||
.apiKey(chatModelVo.getApiKey())
|
||||
.listeners(List.of(new MyChatModelListener()))
|
||||
.modelName(chatModelVo.getModelName())
|
||||
McpClient filesystemMcpClient = new DefaultMcpClient.Builder()
|
||||
.transport(filesystemTransport)
|
||||
.listener(new MyMcpClientListener(userId))
|
||||
.build();
|
||||
|
||||
// 构建各Agent
|
||||
// 合并三个 MCP 客户端的工具
|
||||
ToolProvider toolProvider = McpToolProvider.builder()
|
||||
// bingMcpClient,
|
||||
.mcpClients(List.of(playwrightMcpClient, filesystemMcpClient))
|
||||
.build();
|
||||
|
||||
// ========== LangChain4j Skills 基本用法 ==========
|
||||
// 通过 SKILL.md 文件定义,LLM 按需通过 activate_skill 工具加载
|
||||
// 加载 Skills - 使用相对路径,基于项目根目录
|
||||
java.nio.file.Path skillsPath = java.nio.file.Path.of(userDir, "ruoyi-admin/src/main/resources/skills");
|
||||
List<dev.langchain4j.skills.FileSystemSkill> skillsList = dev.langchain4j.skills.FileSystemSkillLoader
|
||||
.loadSkills(skillsPath)
|
||||
;
|
||||
|
||||
ShellSkills skills = ShellSkills.from(skillsList);
|
||||
|
||||
// 构建子 Agent
|
||||
WebSearchAgent searchAgent = AgenticServices.agentBuilder(WebSearchAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.toolProvider(toolProvider)
|
||||
.listener(new MyAgentListener())
|
||||
.build();
|
||||
|
||||
// 构建子 Agent 2: SkillsAgent - 负责文档处理技能(docx、pdf、xlsx)
|
||||
// 独立管理 Skills 工具
|
||||
SkillsAgent skillsAgent = AgenticServices.agentBuilder(SkillsAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.systemMessage("You have access to the following skills:\n" + skills.formatAvailableSkills()
|
||||
+ "\nWhen the user's request relates to one of these skills, activate it first using the `activate_skill` tool before proceeding.")
|
||||
.toolProvider(skills.toolProvider())
|
||||
.build();
|
||||
|
||||
// 构建子 Agent 3: SqlAgent - 负责数据库查询
|
||||
SqlAgent sqlAgent = AgenticServices.agentBuilder(SqlAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.listener(new MyAgentListener())
|
||||
.tools(new QueryAllTablesTool(), new QueryTableSchemaTool(), new ExecuteSqlQueryTool())
|
||||
.build();
|
||||
|
||||
WebSearchAgent searchAgent = AgenticServices.agentBuilder(WebSearchAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.listener(new MyAgentListener())
|
||||
.toolProvider(toolProvider)
|
||||
.build();
|
||||
|
||||
// 构建子 Agent 4: ChartGenerationAgent - 负责图表生成
|
||||
ChartGenerationAgent chartGenerationAgent = AgenticServices.agentBuilder(ChartGenerationAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.listener(new MyAgentListener())
|
||||
.toolProvider(toolProvider1)
|
||||
.build();
|
||||
|
||||
// 构建监督者Agent
|
||||
// 构建子 Agent 5: EchartsAgent - 负责数据可视化(结合 SQL 查询生成 Echarts 图表)
|
||||
EchartsAgent echartsAgent = AgenticServices.agentBuilder(EchartsAgent.class)
|
||||
.chatModel(plannerModel)
|
||||
.tools(new QueryAllTablesTool(), new QueryTableSchemaTool(), new ExecuteSqlQueryTool())
|
||||
.listener(new MyAgentListener())
|
||||
.build();
|
||||
|
||||
// 构建监督者 Agent - 管理多个子 Agent
|
||||
SupervisorAgent supervisor = AgenticServices.supervisorBuilder()
|
||||
.chatModel(plannerModel)
|
||||
.listener(new MyAgentListener())
|
||||
.subAgents(sqlAgent, searchAgent, chartGenerationAgent)
|
||||
//.listener(new SupervisorStreamListener(null))
|
||||
.subAgents(skillsAgent,searchAgent, sqlAgent, chartGenerationAgent, echartsAgent)
|
||||
// 加入历史上下文 - 使用 ChatMemoryProvider 提供持久化的聊天内存
|
||||
//.chatMemoryProvider(memoryId -> createChatMemory(chatRequest.getSessionId()))
|
||||
.responseStrategy(SupervisorResponseStrategy.LAST)
|
||||
.build();
|
||||
|
||||
// 调用 supervisor
|
||||
String invoke = supervisor.invoke(chatRequest.getContent());
|
||||
log.info("supervisor.invoke() 返回: {}", invoke);
|
||||
String tokenValue = chatRequest.getTokenValue();
|
||||
|
||||
// 异步执行 supervisor,避免阻塞 HTTP 请求线程导致 SSE 事件被缓冲
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
String result = supervisor.invoke(chatRequest.getContent());
|
||||
SseMessageUtils.sendContent(userId, result);
|
||||
SseMessageUtils.sendDone(userId);
|
||||
} catch (Exception e) {
|
||||
log.error("Supervisor 执行失败", e);
|
||||
SseMessageUtils.sendError(userId, e.getMessage());
|
||||
} finally {
|
||||
SseMessageUtils.completeConnection(userId, tokenValue);
|
||||
}
|
||||
});
|
||||
return chatRequest.getEmitter();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -549,7 +599,5 @@ public class ChatServiceFacade implements IChatService {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,313 @@
|
||||
package org.ruoyi.agent;
|
||||
|
||||
import dev.langchain4j.agentic.AgenticServices;
|
||||
import dev.langchain4j.agentic.supervisor.SupervisorAgent;
|
||||
import dev.langchain4j.agentic.supervisor.SupervisorResponseStrategy;
|
||||
import dev.langchain4j.model.chat.ChatModel;
|
||||
import dev.langchain4j.model.chat.StreamingChatModel;
|
||||
import dev.langchain4j.model.chat.listener.ChatModelErrorContext;
|
||||
import dev.langchain4j.model.chat.listener.ChatModelListener;
|
||||
import dev.langchain4j.model.chat.listener.ChatModelRequestContext;
|
||||
import dev.langchain4j.model.chat.listener.ChatModelResponseContext;
|
||||
import dev.langchain4j.model.chat.response.ChatResponse;
|
||||
import dev.langchain4j.model.openai.OpenAiChatModel;
|
||||
import dev.langchain4j.model.openai.OpenAiStreamingChatModel;
|
||||
import dev.langchain4j.service.SystemMessage;
|
||||
import dev.langchain4j.service.UserMessage;
|
||||
import dev.langchain4j.service.V;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.ruoyi.observability.OutputChannel;
|
||||
import org.ruoyi.observability.StreamingOutputWrapper;
|
||||
import org.ruoyi.observability.SupervisorStreamListener;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 子 Agent 流式输出集成测试
|
||||
*
|
||||
* 测试内容:
|
||||
* 1. 观察单个 Agent 的流式输出
|
||||
* 2. 观察 Supervisor 调用子 Agent 的流式输出
|
||||
* 3. 验证 AgentListener 事件回调
|
||||
* 4. 验证 StreamingOutputWrapper 的 token 拦截
|
||||
*
|
||||
* 注意:运行测试前需要配置正确的 API Key
|
||||
* 可以通过环境变量或直接修改配置区域
|
||||
*
|
||||
* @author ageerle@163.com
|
||||
* @date 2025/04/10
|
||||
*/
|
||||
@Disabled("需要配置 API Key 后手动启用")
|
||||
public class StreamingAgentIntegrationTest {
|
||||
|
||||
// ==================== 配置区域 ====================
|
||||
private static final String BASE_URL = "https://api.ppio.com/openai";
|
||||
private static final String API_KEY = System.getenv("PPIO_API_KEY") != null
|
||||
? System.getenv("PPIO_API_KEY")
|
||||
: "xx"; // 默认 Key
|
||||
private static final String MODEL_NAME = "deepseek/deepseek-v3.2";
|
||||
|
||||
private StreamingChatModel streamingModel;
|
||||
private OpenAiChatModel syncModel;
|
||||
|
||||
// ==================== Agent 接口定义 ====================
|
||||
|
||||
public interface MathAgent {
|
||||
@SystemMessage("你是一个数学计算助手,帮助用户解决数学问题。直接给出计算结果和简要解释。")
|
||||
@UserMessage("计算:{{query}}")
|
||||
@dev.langchain4j.agentic.Agent("数学计算助手")
|
||||
String calculate(@V("query") String query);
|
||||
}
|
||||
|
||||
public interface TextAgent {
|
||||
@SystemMessage("你是一个文本分析助手,帮助用户分析文本内容。给出简洁的分析结果。")
|
||||
@UserMessage("分析以下文本:{{text}}")
|
||||
@dev.langchain4j.agentic.Agent("文本分析助手")
|
||||
String analyze(@V("text") String text);
|
||||
}
|
||||
|
||||
public interface WeatherAgent {
|
||||
@SystemMessage("你是一个天气助手。根据用户提供的信息给出天气相关的回答。")
|
||||
@UserMessage("回答问题:{{query}}")
|
||||
@dev.langchain4j.agentic.Agent("天气助手")
|
||||
String answer(@V("query") String query);
|
||||
}
|
||||
|
||||
// ==================== 初始化 ====================
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
// streamingModel = OpenAiStreamingChatModel.builder()
|
||||
// .baseUrl(BASE_URL)
|
||||
// .apiKey(API_KEY)
|
||||
// .modelName(MODEL_NAME)
|
||||
// .build();
|
||||
|
||||
streamingModel = OpenAiStreamingChatModel.builder()
|
||||
.baseUrl(BASE_URL)
|
||||
.apiKey(API_KEY)
|
||||
.listeners(List.of(new ChatModelListener() {
|
||||
@Override
|
||||
public void onRequest(ChatModelRequestContext ctx) {
|
||||
// 请求发送前
|
||||
}
|
||||
@Override
|
||||
public void onResponse(ChatModelResponseContext ctx) {
|
||||
// 响应完成后
|
||||
}
|
||||
@Override
|
||||
public void onError(ChatModelErrorContext ctx) {
|
||||
// 错误时
|
||||
}
|
||||
}))
|
||||
.build();
|
||||
|
||||
|
||||
syncModel = OpenAiChatModel.builder()
|
||||
.baseUrl(BASE_URL)
|
||||
.apiKey(API_KEY)
|
||||
.modelName(MODEL_NAME)
|
||||
.build();
|
||||
}
|
||||
|
||||
// ==================== 测试方法 ====================
|
||||
|
||||
@Test
|
||||
@DisplayName("测试1: 基础流式输出 - 单个 Agent")
|
||||
void testBasicStreamingAgent() throws Exception {
|
||||
System.out.println("\n=== 测试1: 基础流式输出 - 单个 Agent ===\n");
|
||||
|
||||
// 创建事件总线
|
||||
String requestId = UUID.randomUUID().toString();
|
||||
OutputChannel channel = OutputChannel.create(requestId);
|
||||
CountDownLatch completed = new CountDownLatch(1);
|
||||
|
||||
// 包装模型以捕获流式输出
|
||||
ChatModel wrappedModel = new StreamingOutputWrapper(streamingModel, channel);
|
||||
|
||||
// 构建 Agent
|
||||
MathAgent mathAgent = AgenticServices.agentBuilder(MathAgent.class)
|
||||
.chatModel(wrappedModel)
|
||||
.build();
|
||||
|
||||
// 异步执行
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
System.out.println(">>> 调用 MathAgent.calculate()...");
|
||||
String result = mathAgent.calculate("计算 123 * 456 + 789 的值");
|
||||
System.out.println("\n>>> 最终结果: " + result);
|
||||
} catch (Exception e) {
|
||||
System.err.println(">>> 异常: " + e.getMessage());
|
||||
channel.completeWithError(e);
|
||||
} finally {
|
||||
channel.complete();
|
||||
completed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// drain 推送
|
||||
channel.drain(text -> {
|
||||
System.out.print(text);
|
||||
System.out.flush();
|
||||
});
|
||||
|
||||
completed.await(30, TimeUnit.SECONDS);
|
||||
OutputChannel.remove(requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("测试2: Supervisor 模式 - 单个子 Agent 流式输出")
|
||||
void testSupervisorWithSingleSubAgent() throws Exception {
|
||||
System.out.println("\n=== 测试2: Supervisor 模式 - 单个子 Agent ===\n");
|
||||
|
||||
String requestId = UUID.randomUUID().toString();
|
||||
OutputChannel channel = OutputChannel.create(requestId);
|
||||
CountDownLatch completed = new CountDownLatch(1);
|
||||
|
||||
// 包装模型
|
||||
|
||||
// 子 Agent
|
||||
MathAgent mathAgent = AgenticServices.agentBuilder(MathAgent.class)
|
||||
.streamingChatModel(streamingModel)
|
||||
.build();
|
||||
|
||||
|
||||
// Supervisor(注册监听器)
|
||||
SupervisorAgent supervisor = AgenticServices.supervisorBuilder()
|
||||
.chatModel(syncModel)
|
||||
//.listener(new SupervisorStreamListener(channel))
|
||||
.subAgents(mathAgent)
|
||||
.responseStrategy(SupervisorResponseStrategy.LAST)
|
||||
.build();
|
||||
|
||||
// 异步执行
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
System.out.println(">>> Supervisor.invoke() 开始...");
|
||||
String result = supervisor.invoke("帮我计算 999 除以 3 等于多少");
|
||||
System.out.println("\n>>> Supervisor 结果: " + result);
|
||||
} catch (Exception e) {
|
||||
System.err.println(">>> 异常: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
// channel.completeWithError(e);
|
||||
} finally {
|
||||
// channel.complete();
|
||||
completed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// drain 推送
|
||||
channel.drain(text -> {
|
||||
System.out.print(text);
|
||||
System.out.flush();
|
||||
});
|
||||
|
||||
completed.await(60, TimeUnit.SECONDS);
|
||||
OutputChannel.remove(requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("测试3: Supervisor 模式 - 多个子 Agent 流式输出")
|
||||
void testSupervisorWithMultipleSubAgents() throws Exception {
|
||||
System.out.println("\n=== 测试3: Supervisor 模式 - 多个子 Agent ===\n");
|
||||
|
||||
String requestId = UUID.randomUUID().toString();
|
||||
OutputChannel channel = OutputChannel.create(requestId);
|
||||
CountDownLatch completed = new CountDownLatch(1);
|
||||
|
||||
// 包装模型
|
||||
ChatModel wrappedModel = new StreamingOutputWrapper(streamingModel, channel);
|
||||
|
||||
|
||||
// 子 Agent
|
||||
MathAgent mathAgent = AgenticServices.agentBuilder(MathAgent.class)
|
||||
.chatModel(wrappedModel)
|
||||
.build();
|
||||
|
||||
TextAgent textAgent = AgenticServices.agentBuilder(TextAgent.class)
|
||||
.chatModel(wrappedModel)
|
||||
.build();
|
||||
|
||||
WeatherAgent weatherAgent = AgenticServices.agentBuilder(WeatherAgent.class)
|
||||
.chatModel(wrappedModel)
|
||||
.build();
|
||||
|
||||
// Supervisor
|
||||
SupervisorAgent supervisor = AgenticServices.supervisorBuilder()
|
||||
.chatModel(syncModel)
|
||||
.listener(new SupervisorStreamListener(channel))
|
||||
.subAgents(mathAgent, textAgent, weatherAgent)
|
||||
.responseStrategy(SupervisorResponseStrategy.LAST)
|
||||
.build();
|
||||
|
||||
// 异步执行 - 提一个会触发多个 Agent 的问题
|
||||
CompletableFuture.runAsync(() -> {
|
||||
try {
|
||||
System.out.println(">>> Supervisor.invoke() 开始...");
|
||||
String result = supervisor.invoke(
|
||||
"请帮我做两件事:1. 计算 50 * 20 的结果;2. 分析 '人工智能正在改变世界' 这句话的含义"
|
||||
);
|
||||
System.out.println("\n>>> Supervisor 结果: " + result);
|
||||
} catch (Exception e) {
|
||||
System.err.println(">>> 异常: " + e.getMessage());
|
||||
e.printStackTrace();
|
||||
channel.completeWithError(e);
|
||||
} finally {
|
||||
channel.complete();
|
||||
completed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// drain 推送 - 实时观察流式输出
|
||||
channel.drain(text -> {
|
||||
System.out.print("观察流式输出:"+text);
|
||||
System.out.flush();
|
||||
});
|
||||
|
||||
completed.await(90, TimeUnit.SECONDS);
|
||||
OutputChannel.remove(requestId);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("测试4: 直接观察 StreamingChatModel 的流式响应")
|
||||
void testDirectStreamingChatModel() throws Exception {
|
||||
System.out.println("\n=== 测试4: 直接观察 StreamingChatModel ===\n");
|
||||
|
||||
StringBuilder buffer = new StringBuilder();
|
||||
CountDownLatch completed = new CountDownLatch(1);
|
||||
|
||||
streamingModel.chat("你好,请自我介绍", new dev.langchain4j.model.chat.response.StreamingChatResponseHandler() {
|
||||
@Override
|
||||
public void onPartialResponse(String partialResponse) {
|
||||
buffer.append(partialResponse);
|
||||
System.out.print(partialResponse);
|
||||
System.out.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleteResponse(ChatResponse completeResponse) {
|
||||
System.out.println("\n\n[完成] 总Token数: " +
|
||||
(completeResponse.metadata() != null && completeResponse.metadata().tokenUsage() != null
|
||||
? completeResponse.metadata().tokenUsage().totalTokenCount()
|
||||
: "无"));
|
||||
completed.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {
|
||||
System.err.println("[错误] " + error.getMessage());
|
||||
completed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
completed.await(30, TimeUnit.SECONDS);
|
||||
System.out.println("完整响应内容: " + buffer.toString());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user