diff --git a/docs/version/2023-08-13.sql b/docs/version/2023-08-13.sql new file mode 100644 index 0000000..437309d --- /dev/null +++ b/docs/version/2023-08-13.sql @@ -0,0 +1,15 @@ +###本地消息表 +DROP TABLE IF EXISTS `secure_invoke_record`; +CREATE TABLE `secure_invoke_record` ( + `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'id', + `secure_invoke_json` json NOT NULL COMMENT '请求快照参数json', + `status` tinyint(8) NOT NULL COMMENT '状态 1待执行 2已失败', + `next_retry_time` datetime(3) NOT NULL COMMENT '下一次重试的时间', + `retry_times` int(11) NOT NULL COMMENT '已经重试的次数', + `max_retry_times` int(11) NOT NULL COMMENT '最大重试次数', + `fail_reason` text COMMENT '执行失败的堆栈', + `create_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) COMMENT '创建时间', + `update_time` datetime(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3) COMMENT '修改时间', + PRIMARY KEY (`id`) USING BTREE, + KEY `idx_next_retry_time` (`next_retry_time`) USING BTREE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表'; \ No newline at end of file diff --git a/mallchat-common/pom.xml b/mallchat-common/pom.xml index 4268fc2..ba3a524 100644 --- a/mallchat-common/pom.xml +++ b/mallchat-common/pom.xml @@ -13,117 +13,12 @@ - org.springframework.boot - spring-boot-starter + com.abin.mallchat + mallchat-common-starter - org.projectlombok - lombok - - - cn.hutool - hutool-all - - - - org.jsoup - jsoup - - - - org.mybatis - mybatis - - - com.baomidou - mybatis-plus-boot-starter - - - - com.baomidou - mybatis-plus-generator - - - mybatis-plus-extension - com.baomidou - - - - - org.apache.velocity - velocity-engine-core - 2.0 - - - - mysql - mysql-connector-java - - - - io.jsonwebtoken - jjwt - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-aop - - - - io.netty - netty-all - - - com.github.binarywang - weixin-java-mp - - - com.github.xiaoymin - - knife4j-spring-boot-starter - 2.0.9 - - - org.springframework.boot - spring-boot-starter-validation - - - com.auth0 - java-jwt - 3.19.0 - - - com.github.ben-manes.caffeine - caffeine - - - org.redisson - redisson-spring-boot-starter - - - io.minio - minio - - - - junit - junit - ${junit.version} - test - - - org.springframework - spring-test - 5.3.19 - test - - - org.springframework.boot - spring-boot-test + com.abin.mallchat + mallchat-transaction diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/chat/dao/MessageDao.java b/mallchat-common/src/main/java/com/abin/mallchat/common/chat/dao/MessageDao.java index 480edd6..3ea1ebc 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/chat/dao/MessageDao.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/chat/dao/MessageDao.java @@ -23,10 +23,11 @@ import java.util.Objects; @Service public class MessageDao extends ServiceImpl { - public CursorPageBaseResp getCursorPage(Long roomId, CursorPageBaseReq request) { + public CursorPageBaseResp getCursorPage(Long roomId, CursorPageBaseReq request, Long lastMsgId) { return CursorUtils.getCursorPageByMysql(this, request, wrapper -> { wrapper.eq(Message::getRoomId, roomId); wrapper.eq(Message::getStatus, MessageStatusEnum.NORMAL.getStatus()); + wrapper.le(Objects.nonNull(lastMsgId), Message::getId, lastMsgId); }, Message::getId); } diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java index 0a7a46d..5461c73 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/config/ThreadPoolConfig.java @@ -1,6 +1,7 @@ package com.abin.mallchat.common.common.config; import com.abin.mallchat.common.common.factory.MyThreadFactory; +import com.abin.mallchat.transaction.annotation.SecureInvokeConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; @@ -8,7 +9,8 @@ import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import java.util.concurrent.*; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; /** * Description: 线程池配置 @@ -17,7 +19,7 @@ import java.util.concurrent.*; */ @Configuration @EnableAsync -public class ThreadPoolConfig implements AsyncConfigurer { +public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer { /** * 项目共用线程池 */ @@ -35,6 +37,11 @@ public class ThreadPoolConfig implements AsyncConfigurer { return mallchatExecutor(); } + @Override + public Executor getSecureInvokeExecutor() { + return mallchatExecutor(); + } + @Bean(MALLCHAT_EXECUTOR) @Primary public ThreadPoolTaskExecutor mallchatExecutor() { diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java new file mode 100644 index 0000000..5403bcf --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java @@ -0,0 +1,19 @@ +package com.abin.mallchat.common.common.constant; + +/** + * @author zhongzb create on 2021/06/10 + */ +public interface MQConstant { + + /** + * 消息发送mq + */ + String SEND_MSG_TOPIC = "chat_send_msg"; + String SEND_MSG_GROUP = "chat_send_msg_group"; + + /** + * push用户 + */ + String PUSH_TOPIC = "websocket_push"; + String PUSH_GROUP = "websocket_push_group"; +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/MsgSendMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/MsgSendMessageDTO.java new file mode 100644 index 0000000..1b2b165 --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/MsgSendMessageDTO.java @@ -0,0 +1,19 @@ +package com.abin.mallchat.common.common.domain.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * Description: + * Author: abin + * Date: 2023-08-12 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class MsgSendMessageDTO implements Serializable { + private Long msgId; +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/PushMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/PushMessageDTO.java new file mode 100644 index 0000000..06eeb90 --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/PushMessageDTO.java @@ -0,0 +1,46 @@ +package com.abin.mallchat.common.common.domain.dto; + +import com.abin.mallchat.common.user.domain.enums.WSBaseResp; +import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * Description: 推送给用户的消息对象 + * Author: abin + * Date: 2023-08-12 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class PushMessageDTO implements Serializable { + /** + * 推送的ws消息 + */ + private WSBaseResp wsBaseMsg; + /** + * 推送的uid + */ + private Long uid; + + /** + * 推送类型 1个人 2全员 + * + * @see com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum + */ + private Integer pushType; + + public PushMessageDTO(Long uid, WSBaseResp wsBaseMsg) { + this.uid = uid; + this.wsBaseMsg = wsBaseMsg; + this.pushType = WSPushTypeEnum.USER.getType(); + } + + public PushMessageDTO(WSBaseResp wsBaseMsg) { + this.wsBaseMsg = wsBaseMsg; + this.pushType = WSPushTypeEnum.ALL.getType(); + } +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/event/WSPushEvent.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/event/WSPushEvent.java deleted file mode 100644 index 197f217..0000000 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/event/WSPushEvent.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.abin.mallchat.common.common.event; - -import com.abin.mallchat.common.user.domain.enums.WSBaseResp; -import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; -import lombok.Getter; -import org.springframework.context.ApplicationEvent; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -@Getter -public class WSPushEvent extends ApplicationEvent { - /** - * 推送的ws消息 - */ - private final WSBaseResp wsBaseMsg; - /** - * 推送的uid - */ - private final List uidList; - - /** - * 推送类型 1个人 2全员 - * - * @see com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum - */ - private final Integer pushType; - - public WSPushEvent(Object source, Long uid, WSBaseResp wsBaseMsg) { - super(source); - this.uidList = Collections.singletonList(uid); - this.wsBaseMsg = wsBaseMsg; - this.pushType = WSPushTypeEnum.USER.getType(); - } - - public WSPushEvent(Object source, List uidList, WSBaseResp wsBaseMsg) { - super(source); - this.uidList = uidList; - this.wsBaseMsg = wsBaseMsg; - this.pushType = WSPushTypeEnum.USER.getType(); - } - - public WSPushEvent(Object source, WSBaseResp wsBaseMsg) { - super(source); - this.uidList = new ArrayList<>(); - this.wsBaseMsg = wsBaseMsg; - this.pushType = WSPushTypeEnum.ALL.getType(); - } -} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/service/LockService.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/service/LockService.java index d32872e..dc25062 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/service/LockService.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/service/LockService.java @@ -9,8 +9,6 @@ import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -55,15 +53,4 @@ public class LockService { */ T get() throws Throwable; } - - - public static void main(String[] args) { - List sensitiveList = Arrays.asList("abcd", "abcbba", "adabca"); - String text = "abcdefg"; - for (String s : sensitiveList) { - boolean hit = text.contains(s); - System.out.println(hit); - } - - } } \ No newline at end of file diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CursorUtils.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CursorUtils.java index 1cecfb2..dbe07cd 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CursorUtils.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CursorUtils.java @@ -47,22 +47,23 @@ public class CursorUtils { } public static CursorPageBaseResp getCursorPageByMysql(IService mapper, CursorPageBaseReq request, Consumer> initWrapper, SFunction cursorColumn) { + Class cursorType = LambdaUtils.getReturnType(cursorColumn); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); initWrapper.accept(wrapper); if (StrUtil.isNotBlank(request.getCursor())) { - wrapper.lt(cursorColumn, request.getCursor()); + wrapper.lt(cursorColumn, parseCursor(request.getCursor(), cursorType)); } wrapper.orderByDesc(cursorColumn); Page page = mapper.page(request.plusPage(), wrapper); String cursor = Optional.ofNullable(CollectionUtil.getLast(page.getRecords())) .map(cursorColumn) - .map(CursorUtils::parseCursor) + .map(CursorUtils::toCursor) .orElse(null); Boolean isLast = page.getRecords().size() != request.getPageSize(); return new CursorPageBaseResp<>(cursor, isLast, page.getRecords()); } - private static String parseCursor(Object o) { + private static String toCursor(Object o) { if (o instanceof Date) { return String.valueOf(((Date) o).getTime()); } else { @@ -70,4 +71,11 @@ public class CursorUtils { } } + private static Object parseCursor(String cursor, Class cursorClass) { + if (Date.class.isAssignableFrom(cursorClass)) { + return new Date(Long.parseLong(cursor)); + } else { + return cursor; + } + } } diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/LambdaUtils.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/LambdaUtils.java new file mode 100644 index 0000000..1d7bd70 --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/LambdaUtils.java @@ -0,0 +1,111 @@ +package com.abin.mallchat.common.common.utils; + +import cn.hutool.core.map.WeakConcurrentMap; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import com.baomidou.mybatisplus.core.toolkit.support.ColumnCache; +import com.baomidou.mybatisplus.core.toolkit.support.SFunction; +import lombok.SneakyThrows; +import org.apache.ibatis.reflection.property.PropertyNamer; + +import java.io.Serializable; +import java.lang.invoke.SerializedLambda; +import java.lang.ref.WeakReference; +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Description: + * Author: abin + * Date: 2023-08-02 + */ +public class LambdaUtils { + /** + * 字段映射 + */ + private static final Map> COLUMN_CACHE_MAP = new ConcurrentHashMap<>(); + + /** + * SerializedLambda 反序列化缓存 + */ + private static final Map> FUNC_CACHE = new ConcurrentHashMap<>(); + + private static Pattern RETURN_TYPE_PATTERN = Pattern.compile("\\(.*\\)L(.*);"); + private static Pattern PARAMETER_TYPE_PATTERN = Pattern.compile("\\((.*)\\).*"); + private static final WeakConcurrentMap cache = new WeakConcurrentMap<>(); + + /** + * 获取Lambda表达式返回类型 + */ + public static Class getReturnType(Serializable serializable) { + String expr = _resolve(serializable).getInstantiatedMethodType(); + Matcher matcher = RETURN_TYPE_PATTERN.matcher(expr); + if (!matcher.find() || matcher.groupCount() != 1) { + throw new RuntimeException("获取Lambda信息失败"); + } + String className = matcher.group(1).replace("/", "."); + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new RuntimeException("无法加载类", e); + } + } + + @SneakyThrows + public static Class getReturnType(SFunction func) { + com.baomidou.mybatisplus.core.toolkit.support.SerializedLambda lambda = com.baomidou.mybatisplus.core.toolkit.LambdaUtils.resolve(func); + Class aClass = lambda.getInstantiatedType(); + String fieldName = PropertyNamer.methodToProperty(lambda.getImplMethodName()); + Field field = aClass.getDeclaredField(fieldName); + field.setAccessible(true); + return field.getType(); + } + + /** + * 获取Lambda表达式的参数类型 + */ + public static List> getParameterTypes(Serializable serializable) { + String expr = _resolve(serializable).getInstantiatedMethodType(); + Matcher matcher = PARAMETER_TYPE_PATTERN.matcher(expr); + if (!matcher.find() || matcher.groupCount() != 1) { + throw new RuntimeException("获取Lambda信息失败"); + } + expr = matcher.group(1); + + return Arrays.stream(expr.split(";")) + .filter(StrUtil::isNotBlank) + .map(s -> s.replace("L", "").replace("/", ".")) + .map(s -> { + try { + return Class.forName(s); + } catch (ClassNotFoundException e) { + throw new RuntimeException("无法加载类", e); + } + }) + .collect(Collectors.toList()); + } + + /** + * 解析lambda表达式,加了缓存。 + * 该缓存可能会在任意不定的时间被清除。 + * + *

+ * 通过反射调用实现序列化接口函数对象的writeReplace方法,从而拿到{@link SerializedLambda}
+ * 该对象中包含了lambda表达式的所有信息。 + *

+ * + * @param func 需要解析的 lambda 对象 + * @return 返回解析后的结果 + */ + private static SerializedLambda _resolve(Serializable func) { + return cache.computeIfAbsent(func.getClass().getName(), (key) + -> ReflectUtil.invoke(func, "writeReplace")); + } + +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java index 1676f94..9565336 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java @@ -1,6 +1,7 @@ package com.abin.mallchat.common.common.utils; import cn.hutool.extra.spring.SpringUtil; +import com.abin.mallchat.utils.JsonUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/user/service/impl/PushService.java b/mallchat-common/src/main/java/com/abin/mallchat/common/user/service/impl/PushService.java new file mode 100644 index 0000000..0eed34e --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/user/service/impl/PushService.java @@ -0,0 +1,35 @@ +package com.abin.mallchat.common.user.service.impl; + +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.domain.dto.PushMessageDTO; +import com.abin.mallchat.common.user.domain.enums.WSBaseResp; +import com.abin.mallchat.transaction.service.MQProducer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +/** + * Description: + * Author: abin + * Date: 2023-08-12 + */ +@Service +public class PushService { + @Autowired + private MQProducer mqProducer; + + public void sendPushMsg(WSBaseResp msg, List uidList) { + uidList.parallelStream().forEach(uid -> { + mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(uid, msg)); + }); + } + + public void sendPushMsg(WSBaseResp msg, Long uid) { + mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(uid, msg)); + } + + public void sendPushMsg(WSBaseResp msg) { + mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(msg)); + } +} diff --git a/mallchat-common/src/main/resources/application-pro.properties b/mallchat-common/src/main/resources/application-pro.properties index 0081b74..50e6546 100644 --- a/mallchat-common/src/main/resources/application-pro.properties +++ b/mallchat-common/src/main/resources/application-pro.properties @@ -26,6 +26,10 @@ oss.endpoint=http://localhost:9000 oss.access-key=BEZ213 oss.secret-key=Ii4vCMIXuFfds1EZ8e7RXI2342342kV oss.bucketName=default +##################rocketmq################## +rocketmq.name-server=127.0.0.1:9876 +rocketmq.access-key=root +rocketmq.secret-key=123456 ##################gpt配置################## mallchat.chatgpt.use=false mallchat.chatgpt.uid=10001 diff --git a/mallchat-common/src/main/resources/application-test.properties b/mallchat-common/src/main/resources/application-test.properties index dabc77d..d35be3a 100644 --- a/mallchat-common/src/main/resources/application-test.properties +++ b/mallchat-common/src/main/resources/application-test.properties @@ -26,6 +26,10 @@ oss.endpoint=http://localhost:9000 oss.access-key=BEZ213 oss.secret-key=Ii4vCMIXuFe241dsfEZ8e7RXI2342342kV oss.bucketName=default +##################rocketmq################## +rocketmq.name-server=127.0.0.1:9876 +rocketmq.access-key=root +rocketmq.secret-key=123456 ##################gpt配置################## mallchat.chatgpt.use=false mallchat.chatgpt.uid=10001 diff --git a/mallchat-common/src/main/resources/application.yml b/mallchat-common/src/main/resources/application.yml index ae8591f..ee78bd5 100644 --- a/mallchat-common/src/main/resources/application.yml +++ b/mallchat-common/src/main/resources/application.yml @@ -3,7 +3,6 @@ logging: org.springframework.web: INFO com.github.binarywang.demo.wx.mp: DEBUG me.chanjar.weixin: DEBUG - mybatis-plus: mapper-locations: classpath:mapper/**/*.xml configuration: @@ -64,3 +63,25 @@ chatai: url: ${mallchat.chatglm2.url} minute: 3 # 每个用户每3分钟可以请求一次 AIUserId: ${mallchat.chatglm2.uid} +rocketmq: + name-server: ${rocketmq.name-server} + # 默认的消息组 + producer: + group: chatGroup + send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 + compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B + max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B + retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 + retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 + retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false + access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: ${rocketmq.secret-key} # Secret Key + enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 + customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 + # Consumer 配置项 + consumer: + access-key: ${rocketmq.access-key} # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 + secret-key: ${rocketmq.secret-key} # Secret Key + listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, > 。默认情况下,不配置表示监听。 + erbadagang-consumer-group: + topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费 diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/consumer/MsgSendConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/consumer/MsgSendConsumer.java new file mode 100644 index 0000000..d1e0805 --- /dev/null +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/consumer/MsgSendConsumer.java @@ -0,0 +1,102 @@ +package com.abin.mallchat.custom.chat.consumer; + +import com.abin.mallchat.common.chat.dao.ContactDao; +import com.abin.mallchat.common.chat.dao.MessageDao; +import com.abin.mallchat.common.chat.dao.RoomDao; +import com.abin.mallchat.common.chat.dao.RoomFriendDao; +import com.abin.mallchat.common.chat.domain.entity.Message; +import com.abin.mallchat.common.chat.domain.entity.Room; +import com.abin.mallchat.common.chat.domain.entity.RoomFriend; +import com.abin.mallchat.common.chat.domain.enums.RoomTypeEnum; +import com.abin.mallchat.common.chat.domain.vo.response.ChatMessageResp; +import com.abin.mallchat.common.chat.service.cache.GroupMemberCache; +import com.abin.mallchat.common.chat.service.cache.HotRoomCache; +import com.abin.mallchat.common.chat.service.cache.RoomCache; +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.domain.dto.MsgSendMessageDTO; +import com.abin.mallchat.common.user.service.cache.UserCache; +import com.abin.mallchat.common.user.service.impl.PushService; +import com.abin.mallchat.custom.chat.service.ChatService; +import com.abin.mallchat.custom.chat.service.WeChatMsgOperationService; +import com.abin.mallchat.custom.chatai.service.IChatAIService; +import com.abin.mallchat.custom.user.service.WebSocketService; +import com.abin.mallchat.custom.user.service.adapter.WSAdapter; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +/** + * Description: 发送消息更新房间收信箱,并同步给房间成员信箱 + * Author: abin + * Date: 2023-08-12 + */ +@RocketMQMessageListener(consumerGroup = MQConstant.SEND_MSG_GROUP, topic = MQConstant.SEND_MSG_TOPIC) +@Component +public class MsgSendConsumer implements RocketMQListener { + @Autowired + private WebSocketService webSocketService; + @Autowired + private ChatService chatService; + @Autowired + private MessageDao messageDao; + @Autowired + private IChatAIService openAIService; + @Autowired + WeChatMsgOperationService weChatMsgOperationService; + @Autowired + private RoomCache roomCache; + @Autowired + private RoomDao roomDao; + @Autowired + private GroupMemberCache groupMemberCache; + @Autowired + private UserCache userCache; + @Autowired + private RoomFriendDao roomFriendDao; + @Autowired + private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private ContactDao contactDao; + @Autowired + private HotRoomCache hotRoomCache; + @Autowired + private PushService pushService; + + @Override + public void onMessage(MsgSendMessageDTO dto) { + Message message = messageDao.getById(dto.getMsgId()); + Room room = roomCache.get(message.getRoomId()); + ChatMessageResp msgResp = chatService.getMsgResp(message, null); + //所有房间更新房间最新消息 + roomDao.refreshActiveTime(room.getId(), message.getId(), message.getCreateTime()); + roomCache.delete(room.getId()); + if (room.isHotRoom()) {//热门群聊推送所有在线的人 + //更新热门群聊时间-redis + hotRoomCache.refreshActiveTime(room.getId(), message.getCreateTime()); + //推送所有人 + pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp)); + } else { + List memberUidList = new ArrayList<>(); + if (Objects.equals(room.getType(), RoomTypeEnum.GROUP.getType())) {//普通群聊推送所有群成员 + memberUidList = groupMemberCache.getMemberUidList(room.getId()); + } else if (Objects.equals(room.getType(), RoomTypeEnum.FRIEND.getType())) {//单聊对象 + //对单人推送 + RoomFriend roomFriend = roomFriendDao.getByRoomId(room.getId()); + memberUidList = Arrays.asList(roomFriend.getUid1(), roomFriend.getUid2()); + } + //更新所有群成员的会话时间 + contactDao.refreshOrCreateActiveTime(room.getId(), memberUidList, message.getId(), message.getCreateTime()); + //推送房间成员 + pushService.sendPushMsg(WSAdapter.buildMsgSend(msgResp), memberUidList); + } + } + + +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/controller/ChatController.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/controller/ChatController.java index 8a12690..1737471 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/controller/ChatController.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/controller/ChatController.java @@ -46,7 +46,7 @@ public class ChatController { private UserCache userCache; @GetMapping("/public/member/page") - @ApiOperation("群成员列表") + @ApiOperation("群成员列表(废弃)") @Deprecated @FrequencyControl(time = 120, count = 20, target = FrequencyControl.Target.IP) public ApiResult> getMemberPage(@Valid MemberReq request) { @@ -56,7 +56,7 @@ public class ChatController { } @GetMapping("/member/list") - @ApiOperation("房间内的所有群成员列表-@专用") + @ApiOperation("房间内的所有群成员列表-@专用(废弃)") @Deprecated public ApiResult> getMemberList(@Valid ChatMessageMemberReq chatMessageMemberReq) { return ApiResult.success(chatService.getMemberList(chatMessageMemberReq)); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/ChatServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/ChatServiceImpl.java index 2ba83f6..bd1780e 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/ChatServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/ChatServiceImpl.java @@ -41,6 +41,7 @@ import com.abin.mallchat.custom.chat.service.strategy.mark.MsgMarkFactory; import com.abin.mallchat.custom.chat.service.strategy.msg.AbstractMsgHandler; import com.abin.mallchat.custom.chat.service.strategy.msg.MsgHandlerFactory; import com.abin.mallchat.custom.chat.service.strategy.msg.RecallMsgHandler; +import com.abin.mallchat.transaction.service.MQProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -93,6 +94,8 @@ public class ChatServiceImpl implements ChatService { private GroupMemberDao groupMemberDao; @Autowired private RoomGroupCache roomGroupCache; + @Autowired + private MQProducer mqProducer; /** * 发送消息 @@ -149,7 +152,7 @@ public class ChatServiceImpl implements ChatService { List resultList = new ArrayList<>();//最终列表 Boolean isLast = Boolean.FALSE; if (activeStatusEnum == ChatActiveStatusEnum.ONLINE) {//在线列表 - CursorPageBaseResp cursorPage = userDao.getCursorPage(memberUidList, request, ChatActiveStatusEnum.ONLINE); + CursorPageBaseResp cursorPage = userDao.getCursorPage(memberUidList, new CursorPageBaseReq(request.getPageSize(), timeCursor), ChatActiveStatusEnum.ONLINE); resultList.addAll(MemberAdapter.buildMember(cursorPage.getList()));//添加在线列表 if (cursorPage.getIsLast()) {//如果是最后一页,从离线列表再补点数据 activeStatusEnum = ChatActiveStatusEnum.OFFLINE; @@ -160,7 +163,7 @@ public class ChatServiceImpl implements ChatService { timeCursor = cursorPage.getCursor(); isLast = cursorPage.getIsLast(); } else if (activeStatusEnum == ChatActiveStatusEnum.OFFLINE) {//离线列表 - CursorPageBaseResp cursorPage = userDao.getCursorPage(memberUidList, request, ChatActiveStatusEnum.OFFLINE); + CursorPageBaseResp cursorPage = userDao.getCursorPage(memberUidList, new CursorPageBaseReq(request.getPageSize(), timeCursor), ChatActiveStatusEnum.OFFLINE); resultList.addAll(MemberAdapter.buildMember(cursorPage.getList()));//添加离线线列表 timeCursor = cursorPage.getCursor(); isLast = cursorPage.getIsLast(); @@ -171,13 +174,26 @@ public class ChatServiceImpl implements ChatService { @Override public CursorPageBaseResp getMsgPage(ChatMessagePageReq request, Long receiveUid) { - CursorPageBaseResp cursorPage = messageDao.getCursorPage(request.getRoomId(), request); + //用最后一条消息id,来限制被踢出的人能看见的最大一条消息 + Long lastMsgId = getLastMsgId(request.getRoomId(), receiveUid); + CursorPageBaseResp cursorPage = messageDao.getCursorPage(request.getRoomId(), request, lastMsgId); if (cursorPage.isEmpty()) { return CursorPageBaseResp.empty(); } return CursorPageBaseResp.init(cursorPage, getMsgRespBatch(cursorPage.getList(), receiveUid)); } + private Long getLastMsgId(Long roomId, Long receiveUid) { + Room room = roomCache.get(roomId); + AssertUtil.isNotEmpty(room, "房间号有误"); + if (room.isHotRoom()) { + return null; + } + AssertUtil.isNotEmpty(receiveUid, "请先登录"); + Contact contact = contactDao.get(receiveUid, roomId); + return contact.getLastMsgId(); + } + @Override public ChatMemberStatisticResp getMemberStatistic() { System.out.println(Thread.currentThread().getName()); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/RoomAppServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/RoomAppServiceImpl.java index 51d05e2..b1201eb 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/RoomAppServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chat/service/impl/RoomAppServiceImpl.java @@ -17,7 +17,6 @@ import com.abin.mallchat.common.common.annotation.RedissonLock; import com.abin.mallchat.common.common.domain.vo.request.CursorPageBaseReq; import com.abin.mallchat.common.common.domain.vo.response.CursorPageBaseResp; import com.abin.mallchat.common.common.event.GroupMemberAddEvent; -import com.abin.mallchat.common.common.event.WSPushEvent; import com.abin.mallchat.common.common.utils.AssertUtil; import com.abin.mallchat.common.user.dao.UserDao; import com.abin.mallchat.common.user.domain.entity.User; @@ -28,6 +27,7 @@ import com.abin.mallchat.common.user.domain.vo.response.ws.WSMemberChange; import com.abin.mallchat.common.user.service.IRoleService; import com.abin.mallchat.common.user.service.cache.UserCache; import com.abin.mallchat.common.user.service.cache.UserInfoCache; +import com.abin.mallchat.common.user.service.impl.PushService; import com.abin.mallchat.custom.chat.domain.vo.enums.GroupRoleAPPEnum; import com.abin.mallchat.custom.chat.domain.vo.request.*; import com.abin.mallchat.custom.chat.domain.vo.response.ChatMemberListResp; @@ -90,18 +90,20 @@ public class RoomAppServiceImpl implements RoomAppService { private RoomService roomService; @Autowired private GroupMemberCache groupMemberCache; + @Autowired + private PushService pushService; @Override public CursorPageBaseResp getContactPage(CursorPageBaseReq request, Long uid) { //查出用户要展示的会话列表 CursorPageBaseResp page; if (Objects.nonNull(uid)) { - Double hotStart = getCursorOrNull(request.getCursor()); - Double hotEnd; + Double hotEnd = getCursorOrNull(request.getCursor()); + Double hotStart; //用户基础会话 CursorPageBaseResp contactPage = contactDao.getContactPage(uid, request); List baseRoomIds = contactPage.getList().stream().map(Contact::getRoomId).collect(Collectors.toList()); - hotEnd = getCursorOrNull(contactPage.getCursor()); + hotStart = getCursorOrNull(contactPage.getCursor()); //热门房间 Set> typedTuples = hotRoomCache.getRoomRange(hotStart, hotEnd); List hotRoomIds = typedTuples.stream().map(ZSetOperations.TypedTuple::getValue).filter(Objects::nonNull).map(Long::parseLong).collect(Collectors.toList()); @@ -193,7 +195,7 @@ public class RoomAppServiceImpl implements RoomAppService { //发送移除事件告知群成员 List memberUidList = groupMemberCache.getMemberUidList(roomGroup.getRoomId()); WSBaseResp ws = MemberAdapter.buildMemberRemoveWS(roomGroup.getRoomId(), member.getUid()); - applicationEventPublisher.publishEvent(new WSPushEvent(this, memberUidList, ws)); + pushService.sendPushMsg(ws, memberUidList); groupMemberCache.evictMemberUidList(member.getId()); } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/utils/ChatGPTUtils.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/utils/ChatGPTUtils.java index 80f4c25..4a4a1b8 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/utils/ChatGPTUtils.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/chatai/utils/ChatGPTUtils.java @@ -1,8 +1,8 @@ package com.abin.mallchat.custom.chatai.utils; import com.abin.mallchat.common.common.exception.BusinessException; -import com.abin.mallchat.common.common.utils.JsonUtils; import com.abin.mallchat.custom.chatai.domain.ChatGPTMsg; +import com.abin.mallchat.utils.JsonUtils; import com.fasterxml.jackson.databind.JsonNode; import com.knuddels.jtokkit.Encodings; import com.knuddels.jtokkit.api.Encoding; diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/GroupMemberAddListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/GroupMemberAddListener.java index 1ed7d90..fbecf33 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/GroupMemberAddListener.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/GroupMemberAddListener.java @@ -6,12 +6,12 @@ import com.abin.mallchat.common.chat.domain.entity.RoomGroup; import com.abin.mallchat.common.chat.service.cache.GroupMemberCache; import com.abin.mallchat.common.chat.service.cache.MsgCache; import com.abin.mallchat.common.common.event.GroupMemberAddEvent; -import com.abin.mallchat.common.common.event.WSPushEvent; import com.abin.mallchat.common.user.dao.UserDao; import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.common.user.domain.enums.WSBaseResp; import com.abin.mallchat.common.user.domain.vo.response.ws.WSMemberChange; import com.abin.mallchat.common.user.service.cache.UserInfoCache; +import com.abin.mallchat.common.user.service.impl.PushService; import com.abin.mallchat.custom.chat.domain.vo.request.ChatMessageReq; import com.abin.mallchat.custom.chat.service.ChatService; import com.abin.mallchat.custom.chat.service.adapter.MemberAdapter; @@ -51,6 +51,8 @@ public class GroupMemberAddListener { private UserDao userDao; @Autowired private GroupMemberCache groupMemberCache; + @Autowired + private PushService pushService; @Async @@ -66,7 +68,6 @@ public class GroupMemberAddListener { } @Async - @TransactionalEventListener(classes = GroupMemberAddEvent.class, fallbackExecution = true) public void sendChangePush(GroupMemberAddEvent event) { List memberList = event.getMemberList(); @@ -76,7 +77,7 @@ public class GroupMemberAddListener { List users = userDao.listByIds(uidList); users.forEach(user -> { WSBaseResp ws = MemberAdapter.buildMemberAddWS(roomGroup.getRoomId(), user); - applicationEventPublisher.publishEvent(new WSPushEvent(this, memberUidList, ws)); + pushService.sendPushMsg(ws, memberUidList); }); //移除缓存 groupMemberCache.evictMemberUidList(roomGroup.getRoomId()); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageRecallListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageRecallListener.java index 02b9ea2..29525a5 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageRecallListener.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageRecallListener.java @@ -3,6 +3,7 @@ package com.abin.mallchat.custom.common.event.listener; import com.abin.mallchat.common.chat.domain.dto.ChatMsgRecallDTO; import com.abin.mallchat.common.chat.service.cache.MsgCache; import com.abin.mallchat.common.common.event.MessageRecallEvent; +import com.abin.mallchat.common.user.service.impl.PushService; import com.abin.mallchat.custom.chat.service.ChatService; import com.abin.mallchat.custom.user.service.WebSocketService; import com.abin.mallchat.custom.user.service.adapter.WSAdapter; @@ -26,6 +27,8 @@ public class MessageRecallListener { private ChatService chatService; @Autowired private MsgCache msgCache; + @Autowired + private PushService pushService; @Async @TransactionalEventListener(classes = MessageRecallEvent.class, fallbackExecution = true) @@ -37,7 +40,7 @@ public class MessageRecallListener { @Async @TransactionalEventListener(classes = MessageRecallEvent.class, fallbackExecution = true) public void sendToAll(MessageRecallEvent event) { - webSocketService.sendToAllOnline(WSAdapter.buildMsgRecall(event.getRecallDTO())); + pushService.sendPushMsg(WSAdapter.buildMsgRecall(event.getRecallDTO())); } } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageSendListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageSendListener.java index b670a2c..882f25a 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageSendListener.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/MessageSendListener.java @@ -6,33 +6,28 @@ import com.abin.mallchat.common.chat.dao.RoomDao; import com.abin.mallchat.common.chat.dao.RoomFriendDao; import com.abin.mallchat.common.chat.domain.entity.Message; import com.abin.mallchat.common.chat.domain.entity.Room; -import com.abin.mallchat.common.chat.domain.entity.RoomFriend; import com.abin.mallchat.common.chat.domain.enums.HotFlagEnum; -import com.abin.mallchat.common.chat.domain.enums.RoomTypeEnum; -import com.abin.mallchat.common.chat.domain.vo.response.ChatMessageResp; import com.abin.mallchat.common.chat.service.cache.GroupMemberCache; import com.abin.mallchat.common.chat.service.cache.HotRoomCache; import com.abin.mallchat.common.chat.service.cache.RoomCache; +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.domain.dto.MsgSendMessageDTO; import com.abin.mallchat.common.common.event.MessageSendEvent; -import com.abin.mallchat.common.common.event.WSPushEvent; import com.abin.mallchat.common.user.service.cache.UserCache; import com.abin.mallchat.custom.chat.service.ChatService; import com.abin.mallchat.custom.chat.service.WeChatMsgOperationService; import com.abin.mallchat.custom.chatai.service.IChatAIService; import com.abin.mallchat.custom.user.service.WebSocketService; -import com.abin.mallchat.custom.user.service.adapter.WSAdapter; +import com.abin.mallchat.transaction.service.MQProducer; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.event.TransactionPhase; import org.springframework.transaction.event.TransactionalEventListener; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Objects; /** @@ -69,35 +64,13 @@ public class MessageSendListener { private ContactDao contactDao; @Autowired private HotRoomCache hotRoomCache; + @Autowired + private MQProducer mqProducer; - @Async - @TransactionalEventListener(classes = MessageSendEvent.class, fallbackExecution = true) + @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT, classes = MessageSendEvent.class, fallbackExecution = true) public void messageRoute(MessageSendEvent event) { - Message message = messageDao.getById(event.getMsgId()); - Room room = roomCache.get(message.getRoomId()); - ChatMessageResp msgResp = chatService.getMsgResp(message, null); - //更新房间最新消息 - roomDao.refreshActiveTime(room.getId(), message.getId(), message.getCreateTime()); - roomCache.delete(room.getId()); - if (isHotRoom(room)) {//热门群聊推送所有在线的人 - //更新热门群聊列表 - hotRoomCache.refreshActiveTime(room.getId(), message.getCreateTime()); - //推送所有人 - applicationEventPublisher.publishEvent(new WSPushEvent(this, WSAdapter.buildMsgSend(msgResp))); - } else { - List memberUidList = new ArrayList<>(); - if (Objects.equals(room.getType(), RoomTypeEnum.GROUP.getType())) {//普通群聊推送所有群成员 - memberUidList = groupMemberCache.getMemberUidList(room.getId()); - } else if (Objects.equals(room.getType(), RoomTypeEnum.FRIEND.getType())) {//单聊对象 - //对单人推送 - RoomFriend roomFriend = roomFriendDao.getByRoomId(room.getId()); - memberUidList = Arrays.asList(roomFriend.getUid1(), roomFriend.getUid2()); - } - //更新所有群成员的会话时间 - contactDao.refreshOrCreateActiveTime(room.getId(), memberUidList, message.getId(), message.getCreateTime()); - //推送房间成员 - applicationEventPublisher.publishEvent(new WSPushEvent(this, memberUidList, WSAdapter.buildMsgSend(msgResp))); - } + Long msgId = event.getMsgId(); + mqProducer.sendSecureMsg(MQConstant.SEND_MSG_TOPIC, new MsgSendMessageDTO(msgId), msgId); } @TransactionalEventListener(classes = MessageSendEvent.class, fallbackExecution = true) diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserApplyListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserApplyListener.java index 54c94b8..9809e5d 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserApplyListener.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserApplyListener.java @@ -4,6 +4,7 @@ import com.abin.mallchat.common.common.event.UserApplyEvent; import com.abin.mallchat.common.user.dao.UserApplyDao; import com.abin.mallchat.common.user.domain.entity.UserApply; import com.abin.mallchat.common.user.domain.vo.response.ws.WSFriendApply; +import com.abin.mallchat.common.user.service.impl.PushService; import com.abin.mallchat.custom.user.service.WebSocketService; import com.abin.mallchat.custom.user.service.adapter.WSAdapter; import lombok.extern.slf4j.Slf4j; @@ -25,12 +26,15 @@ public class UserApplyListener { @Autowired private WebSocketService webSocketService; + @Autowired + private PushService pushService; + @Async @TransactionalEventListener(classes = UserApplyEvent.class, fallbackExecution = true) public void notifyFriend(UserApplyEvent event) { UserApply userApply = event.getUserApply(); Integer unReadCount = userApplyDao.getUnReadCount(userApply.getTargetId()); - webSocketService.sendToUid(WSAdapter.buildApplySend(new WSFriendApply(userApply.getUid(), unReadCount)), userApply.getTargetId()); + pushService.sendPushMsg(WSAdapter.buildApplySend(new WSFriendApply(userApply.getUid(), unReadCount)), userApply.getTargetId()); } } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserOnlineListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserOnlineListener.java index 38cb34e..5e3a57f 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserOnlineListener.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/UserOnlineListener.java @@ -6,6 +6,7 @@ import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.common.user.domain.enums.ChatActiveStatusEnum; import com.abin.mallchat.common.user.service.IpService; import com.abin.mallchat.common.user.service.cache.UserCache; +import com.abin.mallchat.common.user.service.impl.PushService; import com.abin.mallchat.custom.user.service.WebSocketService; import com.abin.mallchat.custom.user.service.adapter.WSAdapter; import lombok.extern.slf4j.Slf4j; @@ -32,6 +33,8 @@ public class UserOnlineListener { private WSAdapter wsAdapter; @Autowired private IpService ipService; + @Autowired + private PushService pushService; @Async @EventListener(classes = UserOnlineEvent.class) @@ -39,7 +42,7 @@ public class UserOnlineListener { User user = event.getUser(); userCache.online(user.getId(), user.getLastOptTime()); //推送给所有在线用户,该用户登录成功 - webSocketService.sendToAllOnline(wsAdapter.buildOnlineNotifyResp(event.getUser())); + pushService.sendPushMsg(wsAdapter.buildOnlineNotifyResp(event.getUser())); } @Async diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/WSPushListener.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/WSPushListener.java deleted file mode 100644 index c6bd81a..0000000 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/common/event/listener/WSPushListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package com.abin.mallchat.custom.common.event.listener; - -import com.abin.mallchat.common.common.event.WSPushEvent; -import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; -import com.abin.mallchat.custom.user.service.WebSocketService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; -import org.springframework.transaction.event.TransactionalEventListener; - -/** - * 好友申请监听器 - * - * @author zhongzb create on 2022/08/26 - */ -@Slf4j -@Component -public class WSPushListener { - @Autowired - private WebSocketService webSocketService; - - @Async - @TransactionalEventListener(classes = WSPushEvent.class, fallbackExecution = true) - public void wsPush(WSPushEvent event) { - WSPushTypeEnum wsPushTypeEnum = WSPushTypeEnum.of(event.getPushType()); - switch (wsPushTypeEnum) { - case USER: - webSocketService.sendToUidList(event.getWsBaseMsg(), event.getUidList()); - break; - case ALL: - webSocketService.sendToAllOnline(event.getWsBaseMsg(), null); - break; - } - } - -} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/PushConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/PushConsumer.java new file mode 100644 index 0000000..84cd50c --- /dev/null +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/PushConsumer.java @@ -0,0 +1,36 @@ +package com.abin.mallchat.custom.user.consumer; + +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.domain.dto.PushMessageDTO; +import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; +import com.abin.mallchat.custom.user.service.WebSocketService; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * Description: + * Author: abin + * Date: 2023-08-12 + */ +@RocketMQMessageListener(topic = MQConstant.PUSH_TOPIC, consumerGroup = MQConstant.PUSH_GROUP, messageModel = MessageModel.BROADCASTING) +@Component +public class PushConsumer implements RocketMQListener { + @Autowired + private WebSocketService webSocketService; + + @Override + public void onMessage(PushMessageDTO message) { + WSPushTypeEnum wsPushTypeEnum = WSPushTypeEnum.of(message.getPushType()); + switch (wsPushTypeEnum) { + case USER: + webSocketService.sendToUid(message.getWsBaseMsg(), message.getUid()); + break; + case ALL: + webSocketService.sendToAllOnline(message.getWsBaseMsg(), null); + break; + } + } +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java index 1d43210..b639cb0 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java @@ -5,8 +5,6 @@ import com.abin.mallchat.common.user.domain.enums.WSBaseResp; import com.abin.mallchat.common.user.domain.vo.request.ws.WSAuthorize; import io.netty.channel.Channel; -import java.util.List; - public interface WebSocketService { /** * 处理用户登录请求,需要返回一张带code的二维码 @@ -70,7 +68,4 @@ public interface WebSocketService { void sendToUid(WSBaseResp wsBaseResp, Long uid); - void sendToUidList(WSBaseResp wsBaseResp, List uidList); - - } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index 396a572..f85e5ec 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -270,10 +270,6 @@ public class WebSocketServiceImpl implements WebSocketService { } - @Override - public void sendToUidList(WSBaseResp wsBaseResp, List uidList) { - uidList.forEach(uid -> sendToUid(wsBaseResp, uid)); - } private void sendMsg(Channel channel, WSBaseResp wsBaseResp) { channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp))); diff --git a/mallchat-custom-server/src/test/java/com/abin/mallchat/custom/spring/WXTemplate.java b/mallchat-custom-server/src/test/java/com/abin/mallchat/custom/spring/WXTemplate.java index 599b2ec..12d234e 100644 --- a/mallchat-custom-server/src/test/java/com/abin/mallchat/custom/spring/WXTemplate.java +++ b/mallchat-custom-server/src/test/java/com/abin/mallchat/custom/spring/WXTemplate.java @@ -1,11 +1,13 @@ package com.abin.mallchat.custom.spring; +import com.abin.mallchat.custom.chat.service.RoomAppService; import com.abin.mallchat.custom.chat.service.WeChatMsgOperationService; import com.abin.mallchat.custom.chat.service.impl.ChatServiceImpl; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.stereotype.Component; import org.springframework.test.context.junit4.SpringRunner; import java.util.Collections; @@ -17,12 +19,15 @@ import java.util.Collections; */ @RunWith(SpringRunner.class) @SpringBootTest +@Component public class WXTemplate { @Autowired private WeChatMsgOperationService chatMsgOperationService; @Autowired private ChatServiceImpl chatService; + @Autowired + private RoomAppService roomAppService; @Test public void test() { diff --git a/mallchat-tools/mallchat-common-starter/pom.xml b/mallchat-tools/mallchat-common-starter/pom.xml new file mode 100644 index 0000000..79a116c --- /dev/null +++ b/mallchat-tools/mallchat-common-starter/pom.xml @@ -0,0 +1,135 @@ + + + + mallchat-tools + com.abin.mallchat + 1.0-SNAPSHOT + + 4.0.0 + + mallchat-common-starter + + + + org.springframework.boot + spring-boot-starter + + + org.projectlombok + lombok + + + cn.hutool + hutool-all + + + + org.jsoup + jsoup + + + + org.mybatis + mybatis + + + com.baomidou + mybatis-plus-boot-starter + + + + com.baomidou + mybatis-plus-generator + + + mybatis-plus-extension + com.baomidou + + + + + org.apache.velocity + velocity-engine-core + 2.0 + + + + mysql + mysql-connector-java + + + + io.jsonwebtoken + jjwt + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-aop + + + + io.netty + netty-all + + + com.github.binarywang + weixin-java-mp + + + com.github.xiaoymin + + knife4j-spring-boot-starter + 2.0.9 + + + org.springframework.boot + spring-boot-starter-validation + + + com.auth0 + java-jwt + 3.19.0 + + + com.github.ben-manes.caffeine + caffeine + + + org.redisson + redisson-spring-boot-starter + + + io.minio + minio + + + + junit + junit + ${junit.version} + test + + + org.springframework + spring-test + 5.3.19 + test + + + org.springframework.boot + spring-boot-test + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.2 + + + + \ No newline at end of file diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/JsonUtils.java b/mallchat-tools/mallchat-common-starter/src/main/java/com/abin/mallchat/utils/JsonUtils.java similarity index 53% rename from mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/JsonUtils.java rename to mallchat-tools/mallchat-common-starter/src/main/java/com/abin/mallchat/utils/JsonUtils.java index 29665c2..563eaf5 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/JsonUtils.java +++ b/mallchat-tools/mallchat-common-starter/src/main/java/com/abin/mallchat/utils/JsonUtils.java @@ -1,9 +1,12 @@ -package com.abin.mallchat.common.common.utils; +package com.abin.mallchat.utils; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; + /** * Description: * Author: abin @@ -20,6 +23,23 @@ public class JsonUtils { } } + public static T toObj(String str, TypeReference clz) { + try { + return jsonMapper.readValue(str, clz); + } catch (JsonProcessingException e) { + throw new UnsupportedOperationException(e); + } + } + + public static List toList(String str, Class clz) { + try { + return jsonMapper.readValue(str, new TypeReference>() { + }); + } catch (JsonProcessingException e) { + throw new UnsupportedOperationException(e); + } + } + public static JsonNode toJsonNode(String str) { try { return jsonMapper.readTree(str); @@ -28,6 +48,14 @@ public class JsonUtils { } } + public static T nodeToValue(JsonNode node, Class clz) { + try { + return jsonMapper.treeToValue(node, clz); + } catch (JsonProcessingException e) { + throw new UnsupportedOperationException(e); + } + } + public static String toStr(Object t) { try { return jsonMapper.writeValueAsString(t); diff --git a/mallchat-tools/mallchat-transaction/pom.xml b/mallchat-tools/mallchat-transaction/pom.xml new file mode 100644 index 0000000..4443f26 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/pom.xml @@ -0,0 +1,20 @@ + + + + mallchat-tools + com.abin.mallchat + 1.0-SNAPSHOT + + 4.0.0 + + mallchat-transaction + + + + com.abin.mallchat + mallchat-common-starter + + + \ No newline at end of file diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvoke.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvoke.java new file mode 100644 index 0000000..5a9bdbe --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvoke.java @@ -0,0 +1,29 @@ +package com.abin.mallchat.transaction.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 保证方法成功执行。如果在事务内的方法,会将操作记录入库,保证执行。 + */ +@Retention(RetentionPolicy.RUNTIME)//运行时生效 +@Target(ElementType.METHOD)//作用在方法上 +public @interface SecureInvoke { + + /** + * 默认3次 + * + * @return 最大重试次数(包括第一次正常执行) + */ + int maxRetryTimes() default 3; + + /** + * 默认异步执行,先入库,后续异步执行,不影响主线程快速返回结果,毕竟失败了有重试,而且主线程的事务已经提交了,串行执行没啥意义。 + * 同步执行适合mq消费场景等对耗时不关心,但是希望链路追踪不被异步影响的场景。 + * + * @return 是否异步执行 + */ + boolean async() default true; +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvokeConfigurer.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvokeConfigurer.java new file mode 100644 index 0000000..6e1ca00 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/annotation/SecureInvokeConfigurer.java @@ -0,0 +1,33 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.abin.mallchat.transaction.annotation; + +import org.springframework.lang.Nullable; + +import java.util.concurrent.Executor; + +public interface SecureInvokeConfigurer { + + /** + * 返回一个线程池 + */ + @Nullable + default Executor getSecureInvokeExecutor() { + return null; + } + +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/aspect/SecureInvokeAspect.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/aspect/SecureInvokeAspect.java new file mode 100644 index 0000000..b6b37de --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/aspect/SecureInvokeAspect.java @@ -0,0 +1,60 @@ +package com.abin.mallchat.transaction.aspect; + +import com.abin.mallchat.transaction.annotation.SecureInvoke; +import com.abin.mallchat.transaction.domain.dto.SecureInvokeDTO; +import com.abin.mallchat.transaction.domain.entity.SecureInvokeRecord; +import com.abin.mallchat.transaction.service.SecureInvokeService; +import com.abin.mallchat.utils.JsonUtils; +import lombok.extern.slf4j.Slf4j; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.reflect.MethodSignature; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Description: 安全执行切面 + * Author: abin + * Date: 2023-04-20 + */ +@Slf4j +@Aspect +@Order(Ordered.HIGHEST_PRECEDENCE + 1)//确保最先执行 +@Component +public class SecureInvokeAspect { + @Autowired + private SecureInvokeService secureInvokeService; + + @Around("@annotation(secureInvoke)") + public Object around(ProceedingJoinPoint joinPoint, SecureInvoke secureInvoke) throws Throwable { + boolean async = secureInvoke.async(); + boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive(); + //非事务状态,直接执行,不做任何保证。 + if (!inTransaction) { + return joinPoint.proceed(); + } + Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); + List parameters = Stream.of(method.getParameterTypes()).map(Class::getName).collect(Collectors.toList()); + SecureInvokeDTO dto = SecureInvokeDTO.builder() + .args(JsonUtils.toStr(joinPoint.getArgs())) + .className(method.getDeclaringClass().getName()) + .methodName(method.getName()) + .parameterTypes(JsonUtils.toStr(parameters)) + .build(); + SecureInvokeRecord record = SecureInvokeRecord.builder() + .secureInvokeDTO(dto) + .maxRetryTimes(secureInvoke.maxRetryTimes()) + .build(); + secureInvokeService.invoke(record, async); + return null; + } +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/config/TransactionAutoConfiguration.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/config/TransactionAutoConfiguration.java new file mode 100644 index 0000000..e1d9151 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/config/TransactionAutoConfiguration.java @@ -0,0 +1,69 @@ +package com.abin.mallchat.transaction.config; + +import com.abin.mallchat.transaction.annotation.SecureInvokeConfigurer; +import com.abin.mallchat.transaction.aspect.SecureInvokeAspect; +import com.abin.mallchat.transaction.dao.SecureInvokeRecordDao; +import com.abin.mallchat.transaction.mapper.SecureInvokeRecordMapper; +import com.abin.mallchat.transaction.service.MQProducer; +import com.abin.mallchat.transaction.service.SecureInvokeService; +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.util.CollectionUtils; +import org.springframework.util.function.SingletonSupplier; + +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Description: + * Author: abin + * Date: 2023-08-06 + */ +@Configuration +@EnableScheduling +@MapperScan(basePackageClasses = SecureInvokeRecordMapper.class) +@Import({SecureInvokeAspect.class, SecureInvokeRecordDao.class}) +public class TransactionAutoConfiguration { + + @Nullable + protected Executor executor; + + /** + * Collect any {@link AsyncConfigurer} beans through autowiring. + */ + @Autowired + void setConfigurers(ObjectProvider configurers) { + Supplier configurer = SingletonSupplier.of(() -> { + List candidates = configurers.stream().collect(Collectors.toList()); + if (CollectionUtils.isEmpty(candidates)) { + return null; + } + if (candidates.size() > 1) { + throw new IllegalStateException("Only one SecureInvokeConfigurer may exist"); + } + return candidates.get(0); + }); + executor = Optional.ofNullable(configurer.get()).map(SecureInvokeConfigurer::getSecureInvokeExecutor).orElse(ForkJoinPool.commonPool()); + } + + @Bean + public SecureInvokeService getSecureInvokeService(SecureInvokeRecordDao dao) { + return new SecureInvokeService(dao, executor); + } + + @Bean + public MQProducer getMQProducer() { + return new MQProducer(); + } +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/dao/SecureInvokeRecordDao.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/dao/SecureInvokeRecordDao.java new file mode 100644 index 0000000..cd5360b --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/dao/SecureInvokeRecordDao.java @@ -0,0 +1,32 @@ +package com.abin.mallchat.transaction.dao; + +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import com.abin.mallchat.transaction.domain.entity.SecureInvokeRecord; +import com.abin.mallchat.transaction.mapper.SecureInvokeRecordMapper; +import com.abin.mallchat.transaction.service.SecureInvokeService; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; + +/** + * Description: + * Author: abin + * Date: 2023-08-06 + */ +@Component +public class SecureInvokeRecordDao extends ServiceImpl { + + public List getWaitRetryRecords() { + Date now = new Date(); + //查2分钟前的失败数据。避免刚入库的数据被查出来 + DateTime afterTime = DateUtil.offsetMinute(now, (int) SecureInvokeService.RETRY_INTERVAL_MINUTES); + return lambdaQuery() + .eq(SecureInvokeRecord::getStatus, SecureInvokeRecord.STATUS_WAIT) + .lt(SecureInvokeRecord::getNextRetryTime, new Date()) + .lt(SecureInvokeRecord::getCreateTime, afterTime) + .list(); + } +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/dto/SecureInvokeDTO.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/dto/SecureInvokeDTO.java new file mode 100644 index 0000000..dcc0542 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/dto/SecureInvokeDTO.java @@ -0,0 +1,22 @@ +package com.abin.mallchat.transaction.domain.dto; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Description: + * Author: abin + * Date: 2023-08-06 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SecureInvokeDTO { + private String className; + private String methodName; + private String parameterTypes; + private String args; +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/entity/SecureInvokeRecord.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/entity/SecureInvokeRecord.java new file mode 100644 index 0000000..59d6aa1 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/domain/entity/SecureInvokeRecord.java @@ -0,0 +1,63 @@ +package com.abin.mallchat.transaction.domain.entity; + +import com.abin.mallchat.transaction.domain.dto.SecureInvokeDTO; +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.extension.handlers.JacksonTypeHandler; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Date; + +/** + * Description: + * Author: abin + * Date: 2023-08-06 + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@TableName(value = "secure_invoke_record", autoResultMap = true) +public class SecureInvokeRecord { + public final static byte STATUS_WAIT = 1; + public final static byte STATUS_FAIL = 2; + @TableId(value = "id", type = IdType.AUTO) + private Long id; + /** + * 请求快照参数json + */ + @TableField(value = "secure_invoke_json", typeHandler = JacksonTypeHandler.class) + private SecureInvokeDTO secureInvokeDTO; + /** + * 状态 1待执行 2已失败 + */ + @TableField("status") + @Builder.Default + private byte status = SecureInvokeRecord.STATUS_WAIT; + /** + * 下一次重试的时间 + */ + @TableField("next_retry_time") + @Builder.Default + private Date nextRetryTime = new Date(); + /** + * 已经重试的次数 + */ + @TableField("retry_times") + @Builder.Default + private Integer retryTimes = 0; + @TableField("max_retry_times") + private Integer maxRetryTimes; + @TableField("fail_reason") + private String failReason; + @TableField("create_time") + private Date createTime; + @TableField("update_time") + private Date updateTime; + +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/mapper/SecureInvokeRecordMapper.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/mapper/SecureInvokeRecordMapper.java new file mode 100644 index 0000000..15ec99d --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/mapper/SecureInvokeRecordMapper.java @@ -0,0 +1,8 @@ +package com.abin.mallchat.transaction.mapper; + +import com.abin.mallchat.transaction.domain.entity.SecureInvokeRecord; +import com.baomidou.mybatisplus.core.mapper.BaseMapper; + +public interface SecureInvokeRecordMapper extends BaseMapper { + +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/MQProducer.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/MQProducer.java new file mode 100644 index 0000000..e2165c7 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/MQProducer.java @@ -0,0 +1,38 @@ +package com.abin.mallchat.transaction.service; + +import com.abin.mallchat.transaction.annotation.SecureInvoke; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; + +/** + * Description: 发送mq工具类 + * Author: abin + * Date: 2023-08-12 + */ +public class MQProducer { + + @Autowired + private RocketMQTemplate rocketMQTemplate; + + public void sendMsg(String topic, Object body) { + Message build = MessageBuilder.withPayload(body).build(); + rocketMQTemplate.send(topic, build); + } + + /** + * 发送可靠消息,在事务提交后保证发送成功 + * + * @param topic + * @param body + */ + @SecureInvoke + public void sendSecureMsg(String topic, Object body, Object key) { + Message build = MessageBuilder + .withPayload(body) + .setHeader("KEYS", key) + .build(); + rocketMQTemplate.send(topic, build); + } +} diff --git a/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/SecureInvokeService.java b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/SecureInvokeService.java new file mode 100644 index 0000000..52bc306 --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/java/com/abin/mallchat/transaction/service/SecureInvokeService.java @@ -0,0 +1,146 @@ +package com.abin.mallchat.transaction.service; + +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.extra.spring.SpringUtil; +import com.abin.mallchat.transaction.dao.SecureInvokeRecordDao; +import com.abin.mallchat.transaction.domain.dto.SecureInvokeDTO; +import com.abin.mallchat.transaction.domain.entity.SecureInvokeRecord; +import com.abin.mallchat.utils.JsonUtils; +import com.fasterxml.jackson.databind.JsonNode; +import lombok.AllArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; + +import java.lang.reflect.Method; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +/** + * Description: 安全执行处理器 + * Author: abin + * Date: 2023-08-20 + */ +@Slf4j +@AllArgsConstructor +public class SecureInvokeService { + + public static final double RETRY_INTERVAL_MINUTES = 2D; + + private final SecureInvokeRecordDao secureInvokeRecordDao; + + private final Executor executor; + + @Scheduled(cron = "*/5 * * * * ?") + public void retry() { + List secureInvokeRecords = secureInvokeRecordDao.getWaitRetryRecords(); + for (SecureInvokeRecord secureInvokeRecord : secureInvokeRecords) { + doAsyncInvoke(secureInvokeRecord); + } + } + + public void save(SecureInvokeRecord record) { + secureInvokeRecordDao.save(record); + } + + private void retryRecord(SecureInvokeRecord record, String errorMsg) { + Integer retryTimes = record.getRetryTimes() + 1; + SecureInvokeRecord update = new SecureInvokeRecord(); + update.setId(record.getId()); + update.setFailReason(errorMsg); + update.setNextRetryTime(getNextRetryTime(retryTimes)); + if (retryTimes > record.getMaxRetryTimes()) { + update.setStatus(SecureInvokeRecord.STATUS_FAIL); + } else { + update.setRetryTimes(retryTimes); + } + secureInvokeRecordDao.updateById(update); + } + + private Date getNextRetryTime(Integer retryTimes) {//或者可以采用退避算法 + double waitMinutes = Math.pow(RETRY_INTERVAL_MINUTES, retryTimes);//重试时间指数上升 2m 4m 8m 16m + return DateUtil.offsetMinute(new Date(), (int) waitMinutes); + } + + private void removeRecord(Long id) { + secureInvokeRecordDao.removeById(id); + } + + public void invoke(SecureInvokeRecord record, boolean async) { + boolean inTransaction = TransactionSynchronizationManager.isActualTransactionActive(); + //非事务状态,直接执行,不做任何保证。 + if (!inTransaction) { + return; + } + //保存执行数据 + save(record); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @SneakyThrows + @Override + public void afterCommit() { + //事务后执行 + if (async) { + doAsyncInvoke(record); + } else { + doInvoke(record); + } + } + }); + } + + public void doAsyncInvoke(SecureInvokeRecord record) { + executor.execute(() -> { + System.out.println(Thread.currentThread().getName()); + doInvoke(record); + }); + } + + public void doInvoke(SecureInvokeRecord record) { + SecureInvokeDTO secureInvokeDTO = record.getSecureInvokeDTO(); + try { + Class beanClass = Class.forName(secureInvokeDTO.getClassName()); + Object bean = SpringUtil.getBean(beanClass); + List parameterStrings = JsonUtils.toList(secureInvokeDTO.getParameterTypes(), String.class); + List> parameterClasses = getParameters(parameterStrings); + Method method = ReflectUtil.getMethod(beanClass, secureInvokeDTO.getMethodName(), parameterClasses.toArray(new Class[]{})); + Object[] args = getArgs(secureInvokeDTO, parameterClasses); + //执行方法 + method.invoke(bean, args); + //执行成功更新状态 + removeRecord(record.getId()); + } catch (Throwable e) { + log.error("SecureInvokeService invoke fail", e); + //执行失败,等待下次执行 + retryRecord(record, e.getMessage()); + } + } + + @NotNull + private Object[] getArgs(SecureInvokeDTO secureInvokeDTO, List> parameterClasses) { + JsonNode jsonNode = JsonUtils.toJsonNode(secureInvokeDTO.getArgs()); + Object[] args = new Object[jsonNode.size()]; + for (int i = 0; i < jsonNode.size(); i++) { + Class aClass = parameterClasses.get(i); + args[i] = JsonUtils.nodeToValue(jsonNode.get(i), aClass); + } + return args; + } + + @NotNull + private List> getParameters(List parameterStrings) { + return parameterStrings.stream().map(name -> { + try { + return Class.forName(name); + } catch (ClassNotFoundException e) { + log.error("SecureInvokeService class not fund", e); + } + return null; + }).collect(Collectors.toList()); + } +} diff --git a/mallchat-tools/mallchat-transaction/src/main/resources/META-INF/spring.factories b/mallchat-tools/mallchat-transaction/src/main/resources/META-INF/spring.factories new file mode 100644 index 0000000..18592bc --- /dev/null +++ b/mallchat-tools/mallchat-transaction/src/main/resources/META-INF/spring.factories @@ -0,0 +1,3 @@ +# Auto Configure +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ + com.abin.mallchat.transaction.config.TransactionAutoConfiguration diff --git a/mallchat-tools/pom.xml b/mallchat-tools/pom.xml index 00afeb2..1a81e3b 100644 --- a/mallchat-tools/pom.xml +++ b/mallchat-tools/pom.xml @@ -13,6 +13,8 @@ pom mallchat-redis + mallchat-transaction + mallchat-common-starter diff --git a/pom.xml b/pom.xml index e3b43bc..34ca18d 100644 --- a/pom.xml +++ b/pom.xml @@ -39,7 +39,6 @@ 7.2 8.4.5 2.3.1 - 1.0-SNAPSHOT 1.18.10 4.1.76.Final 4.4.0 @@ -54,7 +53,17 @@ com.abin.mallchat mallchat-common - ${mallchat-common.version} + ${version} + + + com.abin.mallchat + mallchat-common-starter + ${version} + + + com.abin.mallchat + mallchat-transaction + ${version} com.squareup.okhttp3