feat: 修改目前实现类使用统一重试降级逻辑;

This commit is contained in:
likunlong
2025-08-19 16:51:51 +08:00
parent 4434d8346c
commit 1638b9dd75
7 changed files with 67 additions and 16 deletions

View File

@@ -14,6 +14,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Objects;
import org.ruoyi.chat.support.RetryNotifier;
import org.ruoyi.chat.util.SSEUtil;
@Slf4j
@Component
@@ -21,12 +23,18 @@ import java.util.Objects;
public class FastGPTSSEEventSourceListener extends EventSourceListener {
private SseEmitter emitter;
private Long sessionId;
@Autowired(required = false)
public FastGPTSSEEventSourceListener(SseEmitter emitter) {
this.emitter = emitter;
}
public FastGPTSSEEventSourceListener(SseEmitter emitter, Long sessionId) {
this.emitter = emitter;
this.sessionId = sessionId;
}
@Override
public void onOpen(EventSource eventSource, Response response) {
log.info("FastGPT sse连接成功");
@@ -40,6 +48,9 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
if ("flowResponses".equals(type)){
emitter.send(data);
emitter.complete();
if (sessionId != null) {
RetryNotifier.clear(sessionId);
}
} else {
emitter.send(data);
}
@@ -57,13 +68,26 @@ public class FastGPTSSEEventSourceListener extends EventSourceListener {
@SneakyThrows
public void onFailure(EventSource eventSource, Throwable t, Response response) {
if (Objects.isNull(response)) {
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, t != null ? t.getMessage() : "SSE连接失败");
RetryNotifier.notifyFailure(sessionId);
}
return;
}
ResponseBody body = response.body();
if (Objects.nonNull(body)) {
log.error("FastGPT sse连接异常data{},异常:{}", body.string(), t);
String msg = body.string();
log.error("FastGPT sse连接异常data{},异常:{}", msg, t);
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, msg);
RetryNotifier.notifyFailure(sessionId);
}
} else {
log.error("FastGPT sse连接异常data{},异常:{}", response, t);
if (sessionId != null) {
SSEUtil.sendErrorEvent(emitter, String.valueOf(response));
RetryNotifier.notifyFailure(sessionId);
}
}
eventSource.cancel();
}

View File

@@ -20,6 +20,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.ruoyi.chat.support.RetryNotifier;
/**
* 扣子聊天管理
@@ -53,19 +54,25 @@ public class CozeServiceImpl implements IChatService {
Flowable<ChatEvent> resp = coze.chat().stream(req);
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(() -> {
resp.blockingForEach(
event -> {
if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {
emitter.send(event.getMessage().getContent());
log.info("coze: {}", event.getMessage().getContent());
try {
resp.blockingForEach(
event -> {
if (ChatEventType.CONVERSATION_MESSAGE_DELTA.equals(event.getEvent())) {
emitter.send(event.getMessage().getContent());
log.info("coze: {}", event.getMessage().getContent());
}
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
emitter.complete();
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
RetryNotifier.clear(chatRequest.getSessionId());
}
}
if (ChatEventType.CONVERSATION_CHAT_COMPLETED.equals(event.getEvent())) {
emitter.complete();
log.info("Token usage: {}", event.getChat().getUsage().getTokenCount());
}
}
);
coze.shutdownExecutor();
);
} catch (Exception ex) {
RetryNotifier.notifyFailure(chatRequest.getSessionId());
} finally {
coze.shutdownExecutor();
}
});

View File

@@ -27,6 +27,7 @@ import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.Objects;
import org.ruoyi.chat.support.RetryNotifier;
/**
* dify 聊天管理
@@ -112,20 +113,24 @@ public class DifyServiceImpl implements IChatService {
chatRequestResponse.setSessionId(chatRequest.getSessionId());
chatRequestResponse.setPrompt(respMessage.toString());
chatCostService.deductToken(chatRequestResponse);
RetryNotifier.clear(chatRequest.getSessionId());
}
@Override
public void onError(ErrorEvent event) {
System.err.println("错误: " + event.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
@Override
public void onException(Throwable throwable) {
System.err.println("异常: " + throwable.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
});
} catch (Exception e) {
log.error("dify请求失败{}", e.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
return emitter;

View File

@@ -33,7 +33,7 @@ public class FastGPTServiceImpl implements IChatService {
ChatModelVo chatModelVo = chatModelService.selectModelByName(chatRequest.getModel());
OpenAiStreamClient openAiStreamClient = ChatConfig.createOpenAiStreamClient(chatModelVo.getApiHost(), chatModelVo.getApiKey());
List<Message> messages = chatRequest.getMessages();
FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter);
FastGPTSSEEventSourceListener listener = new FastGPTSSEEventSourceListener(emitter, chatRequest.getSessionId());
FastGPTChatCompletion completion = FastGPTChatCompletion
.builder()
.messages(messages)
@@ -41,7 +41,12 @@ public class FastGPTServiceImpl implements IChatService {
.detail(true)
.stream(true)
.build();
openAiStreamClient.streamChatCompletion(completion, listener);
try {
openAiStreamClient.streamChatCompletion(completion, listener);
} catch (Exception ex) {
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
throw ex;
}
return emitter;
}

View File

@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.ruoyi.chat.support.RetryNotifier;
/**
@@ -66,12 +67,15 @@ public class OllamaServiceImpl implements IChatService {
emitter.send(substr);
} catch (IOException e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
};
api.chat(requestModel, streamHandler);
emitter.complete();
RetryNotifier.clear(chatRequest.getSessionId());
} catch (Exception e) {
SSEUtil.sendErrorEvent(emitter, e.getMessage());
RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
});

View File

@@ -51,15 +51,18 @@ public class QianWenAiChatServiceImpl implements IChatService {
public void onCompleteResponse(ChatResponse completeResponse) {
emitter.complete();
log.info("消息结束完整消息ID: {}", completeResponse);
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
});
} catch (Exception e) {
log.error("千问请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
return emitter;

View File

@@ -51,14 +51,16 @@ public class ZhipuAiChatServiceImpl implements IChatService {
@SneakyThrows
@Override
public void onError(Throwable error) {
// System.out.println(error.getMessage());
// 透传错误并触发重试
emitter.send(error.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
@Override
public void onCompleteResponse(ChatResponse response) {
emitter.complete();
log.info("消息结束完整消息ID: {}", response.aiMessage());
org.ruoyi.chat.support.RetryNotifier.clear(chatRequest.getSessionId());
}
};
@@ -71,6 +73,7 @@ public class ZhipuAiChatServiceImpl implements IChatService {
model.chat(chatRequest.getPrompt(), handler);
} catch (Exception e) {
log.error("智谱清言请求失败:{}", e.getMessage());
org.ruoyi.chat.support.RetryNotifier.notifyFailure(chatRequest.getSessionId());
}
return emitter;