feat:增加可观测性监听器 调整思考输出监听日志

This commit is contained in:
evo
2026-04-01 23:11:54 +08:00
parent ef99c540bb
commit 3cfb185dde
2 changed files with 62 additions and 6 deletions

View File

@@ -10,6 +10,7 @@ import dev.langchain4j.service.tool.ToolExecution;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* 自定义的 AgentListener 的监听器。
@@ -26,6 +27,13 @@ import java.util.Map;
@Slf4j
public class MyAgentListener implements dev.langchain4j.agentic.observability.AgentListener {
/** 最终捕获到的思考结果(主 Agent 完成后写入,供外部获取) */
private final AtomicReference<String> sharedOutputRef = new AtomicReference<>();
public String getCapturedResult() {
return sharedOutputRef.get();
}
// ==================== Agent 调用生命周期 ====================
@Override
@@ -72,12 +80,19 @@ public class MyAgentListener implements dev.langchain4j.agentic.observability.Ag
AgentInstance agent = agentResponse.agent();
Map<String, Object> inputs = agentResponse.inputs();
Object output = agentResponse.output();
String outputStr = output != null ? output.toString() : "";
log.info("【Agent调用后】Agent名称: {}", agent.name());
log.info("【Agent调用后】Agent ID: {}", agent.agentId());
log.info("【Agent调用后】Agent输入参数: {}", inputs);
log.info("【Agent调用后】Agent输出结果: {}", output);
log.info("【Agent调用后】是否为叶子节点: {}", agent.leaf());
// 捕获主 Agent 的最终输出,供外部获取
if ("invoke".equals(agent.agentId()) && !outputStr.isEmpty()) {
sharedOutputRef.set(outputStr);
log.info("【Agent调用后】已捕获主Agent输出: {}", outputStr);
}
}
@Override

View File

@@ -122,7 +122,7 @@ public class ChatServiceFacade implements IChatService {
List<ChatMessage> contextMessages = buildContextMessages(chatRequest);
// 3. 处理特殊聊天模式(工作流、人机交互恢复、思考模式)
SseEmitter specialResult = handleSpecialChatModes(chatRequest, contextMessages, chatModelVo, emitter);
SseEmitter specialResult = handleSpecialChatModes(chatRequest, contextMessages, chatModelVo, emitter, userId, tokenValue);
if (specialResult != null) {
return specialResult;
}
@@ -151,10 +151,13 @@ public class ChatServiceFacade implements IChatService {
* @param contextMessages 上下文消息列表(可能被修改)
* @param chatModelVo 聊天模型配置
* @param emitter SSE发射器
* @param userId 用户ID
* @param tokenValue 会话令牌
* @return 如果需要提前返回则返回SseEmitter否则返回null
*/
private SseEmitter handleSpecialChatModes(ChatRequest chatRequest, List<ChatMessage> contextMessages,
ChatModelVo chatModelVo, SseEmitter emitter) {
ChatModelVo chatModelVo, SseEmitter emitter,
Long userId, String tokenValue) {
// 处理工作流对话
if (chatRequest.getEnableWorkFlow()) {
log.info("处理工作流对话,会话: {}", chatRequest.getSessionId());
@@ -193,7 +196,16 @@ public class ChatServiceFacade implements IChatService {
// 处理思考模式
if (chatRequest.getEnableThinking()) {
handleThinkingMode(chatRequest, contextMessages, chatModelVo);
String thinkingResult = handleThinkingMode(chatRequest, contextMessages, chatModelVo, userId, tokenValue);
// 思考模式产生了有效结果,通过 SSE 发送给前端后结束
if (thinkingResult != null && !thinkingResult.isBlank()) {
SseMessageUtils.sendDone(userId);
SseMessageUtils.completeConnection(userId, tokenValue);
log.info("思考模式完成,结果已发送: {}", thinkingResult);
return emitter;
}
// 思考结果为空,继续走普通聊天流程
log.warn("思考模式未产生有效结果,继续普通聊天");
}
return null;
@@ -205,8 +217,12 @@ public class ChatServiceFacade implements IChatService {
* @param chatRequest 聊天请求
* @param contextMessages 上下文消息列表
* @param chatModelVo 聊天模型配置
* @param userId 用户ID
* @param tokenValue 会话令牌
* @return 思考结果字符串,如果无结果则返回空字符串
*/
private void handleThinkingMode(ChatRequest chatRequest, List<ChatMessage> contextMessages, ChatModelVo chatModelVo) {
private String handleThinkingMode(ChatRequest chatRequest, List<ChatMessage> contextMessages,
ChatModelVo chatModelVo, Long userId, String tokenValue) {
// 步骤1: 配置MCP传输层 - 连接到bing-cn-mcp服务器
McpTransport transport = new StdioMcpTransport.Builder()
.command(List.of("C:\\Program Files\\nodejs\\npx.cmd", "-y", "bing-cn-mcp"))
@@ -263,13 +279,38 @@ public class ChatServiceFacade implements IChatService {
// 构建监督者Agent
SupervisorAgent supervisor = AgenticServices.supervisorBuilder()
.chatModel(plannerModel)
.subAgents(sqlAgent, chartGenerationAgent)
.subAgents(sqlAgent, searchAgent, chartGenerationAgent)
.responseStrategy(SupervisorResponseStrategy.LAST)
.listener(new MyAgentListener())
.build();
// 调用 supervisor
String invoke = supervisor.invoke(chatRequest.getContent());
contextMessages.add(AiMessage.from(invoke));
log.info("【思考模式】supervisor.invoke() 返回: {}", invoke);
// 如果有有效结果,通过 SSE 发送给前端并保存到数据库
if (invoke != null && !invoke.isBlank()) {
try {
// 通过 SSE 实时发送思考结果
SseMessageUtils.sendContent(userId, invoke);
log.info("【思考模式】结果已发送至SSE: {}", invoke);
// 保存用户消息
chatMessageService.saveChatMessage(userId, chatRequest.getSessionId(),
chatRequest.getContent(), RoleType.USER.getName(), chatRequest.getModel());
// 保存助手思考结果消息
chatMessageService.saveChatMessage(userId, chatRequest.getSessionId(),
invoke, RoleType.ASSISTANT.getName(), chatRequest.getModel());
// 将思考结果添加到上下文,供后续流程使用(如果需要)
contextMessages.add(AiMessage.from(invoke));
} catch (Exception e) {
log.error("【思考模式】发送结果或保存消息失败: {}", e.getMessage(), e);
}
}
return invoke != null ? invoke : "";
}
/**