diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java index 687f7f4..0a7a46d 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java @@ -1,9 +1,6 @@ package com.abin.mallchat.common.common.config; -import cn.hutool.core.thread.NamedThreadFactory; -import cn.hutool.core.thread.ThreadFactoryBuilder; import com.abin.mallchat.common.common.factory.MyThreadFactory; -import com.abin.mallchat.common.common.handler.GlobalUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -11,9 +8,7 @@ import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import java.util.concurrent.Executor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.*; /** * Description: 线程池配置 @@ -32,6 +27,9 @@ public class ThreadPoolConfig implements AsyncConfigurer { */ public static final String WS_EXECUTOR = "websocketExecutor"; + + public static final String AICHAT_EXECUTOR = "aichatExecutor"; + @Override public Executor getAsyncExecutor() { return mallchatExecutor(); @@ -64,5 +62,15 @@ public class ThreadPoolConfig implements AsyncConfigurer { return executor; } - + @Bean(AICHAT_EXECUTOR) + public ThreadPoolTaskExecutor chatAiExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(15); + executor.setThreadNamePrefix("aichat-executor-"); + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());//满了直接丢弃,默认为不重要消息推送 + executor.setThreadFactory(new MyThreadFactory(executor)); + return executor; + } } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/handler/AbstractChatAIHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/handler/AbstractChatAIHandler.java index 183f4e3..e644b4d 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/handler/AbstractChatAIHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/handler/AbstractChatAIHandler.java @@ -1,10 +1,8 @@ package com.abin.mallchat.custom.chatai.handler; -import cn.hutool.core.thread.NamedThreadFactory; import com.abin.mallchat.common.chat.domain.entity.Message; import com.abin.mallchat.common.chat.domain.enums.MessageTypeEnum; -import com.abin.mallchat.common.common.exception.BusinessException; -import com.abin.mallchat.common.common.handler.GlobalUncaughtExceptionHandler; +import com.abin.mallchat.common.common.config.ThreadPoolConfig; import com.abin.mallchat.custom.chat.domain.vo.request.ChatMessageReq; import com.abin.mallchat.custom.chat.domain.vo.request.msg.TextMsgReq; import com.abin.mallchat.custom.chat.service.ChatService; @@ -12,20 +10,18 @@ import com.abin.mallchat.custom.user.domain.vo.response.user.UserInfoResp; import com.abin.mallchat.custom.user.service.UserService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.PostConstruct; import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; @Slf4j -public abstract class AbstractChatAIHandler implements DisposableBean, InitializingBean { - public static ExecutorService EXECUTOR; +public abstract class AbstractChatAIHandler { + @Autowired + @Qualifier(ThreadPoolConfig.AICHAT_EXECUTOR) + private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Autowired protected ChatService chatService; @@ -45,7 +41,7 @@ public abstract class AbstractChatAIHandler implements DisposableBean, Initializ if (!supports(message)) { return; } - EXECUTOR.execute(() -> { + threadPoolTaskExecutor.execute(() -> { String text = doChat(message); if (StringUtils.isNotBlank(text)) { answerMsg(text, message.getRoomId(), message.getFromUid()); @@ -99,30 +95,4 @@ public abstract class AbstractChatAIHandler implements DisposableBean, Initializ chatService.sendMsg(answerReq, getChatAIUserId()); } - @Override - public void afterPropertiesSet() { - EXECUTOR = new ThreadPoolExecutor( - 10, - 10, - 0L, - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(15), - new NamedThreadFactory("openAI-chat-gpt", - null, - false, - new GlobalUncaughtExceptionHandler()), - (r, executor) -> { - throw new BusinessException("别问的太快了,我的脑子不够用了"); - }); - } - - @Override - public void destroy() throws Exception { - EXECUTOR.shutdown(); - if (!EXECUTOR.awaitTermination(30, TimeUnit.SECONDS)) { //最多等30秒,处理不完就拉倒 - if (log.isErrorEnabled()) { - log.error("Timed out while waiting for executor [{}] to terminate", EXECUTOR); - } - } - } }