1.搭建分布式事务框架
2.项目引入rocketmq实现集群广播
This commit is contained in:
zhongzb
2023-08-14 22:57:08 +08:00
parent 6dd2a27df7
commit 7c21b4127e
47 changed files with 1200 additions and 281 deletions

View File

@@ -23,10 +23,11 @@ import java.util.Objects;
@Service
public class MessageDao extends ServiceImpl<MessageMapper, Message> {
public CursorPageBaseResp<Message> getCursorPage(Long roomId, CursorPageBaseReq request) {
public CursorPageBaseResp<Message> getCursorPage(Long roomId, CursorPageBaseReq request, Long lastMsgId) {
return CursorUtils.getCursorPageByMysql(this, request, wrapper -> {
wrapper.eq(Message::getRoomId, roomId);
wrapper.eq(Message::getStatus, MessageStatusEnum.NORMAL.getStatus());
wrapper.le(Objects.nonNull(lastMsgId), Message::getId, lastMsgId);
}, Message::getId);
}

View File

@@ -1,6 +1,7 @@
package com.abin.mallchat.common.common.config;
import com.abin.mallchat.common.common.factory.MyThreadFactory;
import com.abin.mallchat.transaction.annotation.SecureInvokeConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@@ -8,7 +9,8 @@ import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.*;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Description: 线程池配置
@@ -17,7 +19,7 @@ import java.util.concurrent.*;
*/
@Configuration
@EnableAsync
public class ThreadPoolConfig implements AsyncConfigurer {
public class ThreadPoolConfig implements AsyncConfigurer, SecureInvokeConfigurer {
/**
* 项目共用线程池
*/
@@ -35,6 +37,11 @@ public class ThreadPoolConfig implements AsyncConfigurer {
return mallchatExecutor();
}
@Override
public Executor getSecureInvokeExecutor() {
return mallchatExecutor();
}
@Bean(MALLCHAT_EXECUTOR)
@Primary
public ThreadPoolTaskExecutor mallchatExecutor() {

View File

@@ -0,0 +1,19 @@
package com.abin.mallchat.common.common.constant;
/**
* @author zhongzb create on 2021/06/10
*/
public interface MQConstant {
/**
* 消息发送mq
*/
String SEND_MSG_TOPIC = "chat_send_msg";
String SEND_MSG_GROUP = "chat_send_msg_group";
/**
* push用户
*/
String PUSH_TOPIC = "websocket_push";
String PUSH_GROUP = "websocket_push_group";
}

View File

@@ -0,0 +1,19 @@
package com.abin.mallchat.common.common.domain.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* Description:
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-08-12
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MsgSendMessageDTO implements Serializable {
private Long msgId;
}

View File

@@ -0,0 +1,46 @@
package com.abin.mallchat.common.common.domain.dto;
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* Description: 推送给用户的消息对象
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-08-12
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PushMessageDTO implements Serializable {
/**
* 推送的ws消息
*/
private WSBaseResp<?> wsBaseMsg;
/**
* 推送的uid
*/
private Long uid;
/**
* 推送类型 1个人 2全员
*
* @see com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum
*/
private Integer pushType;
public PushMessageDTO(Long uid, WSBaseResp<?> wsBaseMsg) {
this.uid = uid;
this.wsBaseMsg = wsBaseMsg;
this.pushType = WSPushTypeEnum.USER.getType();
}
public PushMessageDTO(WSBaseResp<?> wsBaseMsg) {
this.wsBaseMsg = wsBaseMsg;
this.pushType = WSPushTypeEnum.ALL.getType();
}
}

View File

@@ -1,50 +0,0 @@
package com.abin.mallchat.common.common.event;
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Getter
public class WSPushEvent extends ApplicationEvent {
/**
* 推送的ws消息
*/
private final WSBaseResp<?> wsBaseMsg;
/**
* 推送的uid
*/
private final List<Long> uidList;
/**
* 推送类型 1个人 2全员
*
* @see com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum
*/
private final Integer pushType;
public WSPushEvent(Object source, Long uid, WSBaseResp<?> wsBaseMsg) {
super(source);
this.uidList = Collections.singletonList(uid);
this.wsBaseMsg = wsBaseMsg;
this.pushType = WSPushTypeEnum.USER.getType();
}
public WSPushEvent(Object source, List<Long> uidList, WSBaseResp<?> wsBaseMsg) {
super(source);
this.uidList = uidList;
this.wsBaseMsg = wsBaseMsg;
this.pushType = WSPushTypeEnum.USER.getType();
}
public WSPushEvent(Object source, WSBaseResp<?> wsBaseMsg) {
super(source);
this.uidList = new ArrayList<>();
this.wsBaseMsg = wsBaseMsg;
this.pushType = WSPushTypeEnum.ALL.getType();
}
}

View File

@@ -9,8 +9,6 @@ import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@@ -55,15 +53,4 @@ public class LockService {
*/
T get() throws Throwable;
}
public static void main(String[] args) {
List<String> sensitiveList = Arrays.asList("abcd", "abcbba", "adabca");
String text = "abcdefg";
for (String s : sensitiveList) {
boolean hit = text.contains(s);
System.out.println(hit);
}
}
}

View File

@@ -47,22 +47,23 @@ public class CursorUtils {
}
public static <T> CursorPageBaseResp<T> getCursorPageByMysql(IService<T> mapper, CursorPageBaseReq request, Consumer<LambdaQueryWrapper<T>> initWrapper, SFunction<T, ?> cursorColumn) {
Class<?> cursorType = LambdaUtils.getReturnType(cursorColumn);
LambdaQueryWrapper<T> wrapper = new LambdaQueryWrapper<>();
initWrapper.accept(wrapper);
if (StrUtil.isNotBlank(request.getCursor())) {
wrapper.lt(cursorColumn, request.getCursor());
wrapper.lt(cursorColumn, parseCursor(request.getCursor(), cursorType));
}
wrapper.orderByDesc(cursorColumn);
Page<T> page = mapper.page(request.plusPage(), wrapper);
String cursor = Optional.ofNullable(CollectionUtil.getLast(page.getRecords()))
.map(cursorColumn)
.map(CursorUtils::parseCursor)
.map(CursorUtils::toCursor)
.orElse(null);
Boolean isLast = page.getRecords().size() != request.getPageSize();
return new CursorPageBaseResp<>(cursor, isLast, page.getRecords());
}
private static String parseCursor(Object o) {
private static String toCursor(Object o) {
if (o instanceof Date) {
return String.valueOf(((Date) o).getTime());
} else {
@@ -70,4 +71,11 @@ public class CursorUtils {
}
}
private static Object parseCursor(String cursor, Class<?> cursorClass) {
if (Date.class.isAssignableFrom(cursorClass)) {
return new Date(Long.parseLong(cursor));
} else {
return cursor;
}
}
}

View File

@@ -1,39 +0,0 @@
package com.abin.mallchat.common.common.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Description:
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-04-25
*/
public class JsonUtils {
private static final ObjectMapper jsonMapper = new ObjectMapper();
public static <T> T toObj(String str, Class<T> clz) {
try {
return jsonMapper.readValue(str, clz);
} catch (JsonProcessingException e) {
throw new UnsupportedOperationException(e);
}
}
public static JsonNode toJsonNode(String str) {
try {
return jsonMapper.readTree(str);
} catch (JsonProcessingException e) {
throw new UnsupportedOperationException(e);
}
}
public static String toStr(Object t) {
try {
return jsonMapper.writeValueAsString(t);
} catch (Exception e) {
throw new UnsupportedOperationException(e);
}
}
}

View File

@@ -0,0 +1,111 @@
package com.abin.mallchat.common.common.utils;
import cn.hutool.core.map.WeakConcurrentMap;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.toolkit.support.ColumnCache;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import lombok.SneakyThrows;
import org.apache.ibatis.reflection.property.PropertyNamer;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
* Description:
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-08-02
*/
public class LambdaUtils {
/**
* 字段映射
*/
private static final Map<String, Map<String, ColumnCache>> COLUMN_CACHE_MAP = new ConcurrentHashMap<>();
/**
* SerializedLambda 反序列化缓存
*/
private static final Map<String, WeakReference<com.baomidou.mybatisplus.core.toolkit.support.SerializedLambda>> FUNC_CACHE = new ConcurrentHashMap<>();
private static Pattern RETURN_TYPE_PATTERN = Pattern.compile("\\(.*\\)L(.*);");
private static Pattern PARAMETER_TYPE_PATTERN = Pattern.compile("\\((.*)\\).*");
private static final WeakConcurrentMap<String, SerializedLambda> cache = new WeakConcurrentMap<>();
/**
* 获取Lambda表达式返回类型
*/
public static Class<?> getReturnType(Serializable serializable) {
String expr = _resolve(serializable).getInstantiatedMethodType();
Matcher matcher = RETURN_TYPE_PATTERN.matcher(expr);
if (!matcher.find() || matcher.groupCount() != 1) {
throw new RuntimeException("获取Lambda信息失败");
}
String className = matcher.group(1).replace("/", ".");
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new RuntimeException("无法加载类", e);
}
}
@SneakyThrows
public static <T> Class<?> getReturnType(SFunction<T, ?> func) {
com.baomidou.mybatisplus.core.toolkit.support.SerializedLambda lambda = com.baomidou.mybatisplus.core.toolkit.LambdaUtils.resolve(func);
Class<?> aClass = lambda.getInstantiatedType();
String fieldName = PropertyNamer.methodToProperty(lambda.getImplMethodName());
Field field = aClass.getDeclaredField(fieldName);
field.setAccessible(true);
return field.getType();
}
/**
* 获取Lambda表达式的参数类型
*/
public static List<Class<?>> getParameterTypes(Serializable serializable) {
String expr = _resolve(serializable).getInstantiatedMethodType();
Matcher matcher = PARAMETER_TYPE_PATTERN.matcher(expr);
if (!matcher.find() || matcher.groupCount() != 1) {
throw new RuntimeException("获取Lambda信息失败");
}
expr = matcher.group(1);
return Arrays.stream(expr.split(";"))
.filter(StrUtil::isNotBlank)
.map(s -> s.replace("L", "").replace("/", "."))
.map(s -> {
try {
return Class.forName(s);
} catch (ClassNotFoundException e) {
throw new RuntimeException("无法加载类", e);
}
})
.collect(Collectors.toList());
}
/**
* 解析lambda表达式,加了缓存。
* 该缓存可能会在任意不定的时间被清除。
*
* <p>
* 通过反射调用实现序列化接口函数对象的writeReplace方法从而拿到{@link SerializedLambda}<br>
* 该对象中包含了lambda表达式的所有信息。
* </p>
*
* @param func 需要解析的 lambda 对象
* @return 返回解析后的结果
*/
private static SerializedLambda _resolve(Serializable func) {
return cache.computeIfAbsent(func.getClass().getName(), (key)
-> ReflectUtil.invoke(func, "writeReplace"));
}
}

View File

@@ -1,6 +1,7 @@
package com.abin.mallchat.common.common.utils;
import cn.hutool.extra.spring.SpringUtil;
import com.abin.mallchat.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;

View File

@@ -0,0 +1,35 @@
package com.abin.mallchat.common.user.service.impl;
import com.abin.mallchat.common.common.constant.MQConstant;
import com.abin.mallchat.common.common.domain.dto.PushMessageDTO;
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
import com.abin.mallchat.transaction.service.MQProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Description:
* Author: <a href="https://github.com/zongzibinbin">abin</a>
* Date: 2023-08-12
*/
@Service
public class PushService {
@Autowired
private MQProducer mqProducer;
public void sendPushMsg(WSBaseResp<?> msg, List<Long> uidList) {
uidList.parallelStream().forEach(uid -> {
mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(uid, msg));
});
}
public void sendPushMsg(WSBaseResp<?> msg, Long uid) {
mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(uid, msg));
}
public void sendPushMsg(WSBaseResp<?> msg) {
mqProducer.sendMsg(MQConstant.PUSH_TOPIC, new PushMessageDTO(msg));
}
}