diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java
index 5403bcf..aa877eb 100644
--- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java
@@ -16,4 +16,16 @@ public interface MQConstant {
*/
String PUSH_TOPIC = "websocket_push";
String PUSH_GROUP = "websocket_push_group";
+
+ /**
+ * (授权完成后)登录信息mq
+ */
+ String LOGIN_MSG_TOPIC = "login_send_msg";
+ String LOGIN_MSG_GROUP = "login_send_msg_group";
+
+ /**
+ * 扫码成功 信息发送mq
+ */
+ String SCAN_MSG_TOPIC = "scan_send_msg";
+ String SCAN_MSG_GROUP = "scan_send_msg_group";
}
diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java
index 38ee20d..ce3bf8b 100644
--- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java
@@ -58,6 +58,12 @@ public class RedisKey {
public static final String USER_CHAT_CONTEXT = "useChatGPTContext:uid_%d_roomId_%d";
+ /**
+ * 保存Open id
+ */
+ public static final String OPEN_ID_STRING = "openid:%s";
+
+
/**
* 用户上次使用GLM使用时间
*/
diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java
new file mode 100644
index 0000000..f250c0a
--- /dev/null
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java
@@ -0,0 +1,32 @@
+package com.abin.mallchat.common.common.domain.dto;
+
+import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
+import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
+
+import java.io.Serializable;
+
+/**
+ * Description: 将扫码登录返回信息推送给所有横向扩展的服务
+ * Author: zjy
+ * Date: 2023-08-12
+ */
+@Data
+@NoArgsConstructor
+public class LoginMessageDTO implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 微信公众号获得扫码事件后,发送给我方的回调信息
+ */
+ private WxMpXmlMessage wxMpXmlMessage ;
+
+ public LoginMessageDTO(WxMpXmlMessage wxMpXmlMessage) {
+ this.wxMpXmlMessage = wxMpXmlMessage;
+ }
+
+}
diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java
new file mode 100644
index 0000000..7472181
--- /dev/null
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java
@@ -0,0 +1,33 @@
+package com.abin.mallchat.common.common.domain.dto;
+
+import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
+import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * Description: 扫码成功对象,推送给用户的消息对象
+ * Author: abin
+ * Date: 2023-08-12
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ScanSuccessMessageDTO implements Serializable {
+ /**
+ * 推送的ws消息
+ */
+ private WSBaseResp> wsBaseMsg;
+ /**
+ * 推送的uid
+ */
+ private Integer loginCode;
+
+ public ScanSuccessMessageDTO(Integer loginCode, WSBaseResp> wsBaseMsg) {
+ this.loginCode = loginCode;
+ this.wsBaseMsg = wsBaseMsg;
+ }
+}
diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java
new file mode 100644
index 0000000..8fc4f0c
--- /dev/null
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java
@@ -0,0 +1,26 @@
+package com.abin.mallchat.common.common.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import io.netty.channel.Channel;
+
+import java.time.Duration;
+
+/**
+ * Description: Cache管理器
+ * Author: abin
+ * Date: 2023-04-05
+ */
+public class CacheHolder {
+
+ private static final Long MAX_MUM_SIZE = 10000L;
+
+ private static final Duration EXPIRE_TIME = Duration.ofHours(1);
+ /**
+ * 所有请求登录的code与channel关系
+ */
+ public static final Cache WAIT_LOGIN_MAP = Caffeine.newBuilder()
+ .expireAfterWrite(EXPIRE_TIME)
+ .maximumSize(MAX_MUM_SIZE)
+ .build();
+}
diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java
index 9565336..d6f29e7 100644
--- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java
+++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java
@@ -39,6 +39,22 @@ public class RedisUtils {
}
/**
+ * 自增int
+ *
+ * @param key 键
+ * @param time 时间(秒)
+ */
+ public static Integer integerInc(String key, int time, TimeUnit unit) {
+ RedisScript redisScript = new DefaultRedisScript<>(LUA_INCR_EXPIRE, Long.class);
+ Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(unit.toSeconds(time)));
+ try{
+ return Integer.parseInt(result.toString());
+ }catch (Exception e) {
+ RedisUtils.del(key);
+ throw e;
+ }
+ }
+ /**
* 指定缓存失效时间
*
* @param key 键
@@ -862,8 +878,8 @@ public class RedisUtils {
* @param end
* @return
*/
- public static Set> zRangeWithScores(String key, long start,
- long end) {
+ public static Set> zRangeWithScores(String key, long start,
+ long end) {
return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
}
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java
new file mode 100644
index 0000000..43b88e6
--- /dev/null
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java
@@ -0,0 +1,56 @@
+package com.abin.mallchat.custom.user.consumer;
+
+import com.abin.mallchat.common.common.constant.MQConstant;
+import com.abin.mallchat.common.common.constant.RedisKey;
+import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
+import com.abin.mallchat.common.common.utils.CacheHolder;
+import com.abin.mallchat.common.user.dao.UserDao;
+import com.abin.mallchat.common.user.domain.entity.User;
+import com.abin.mallchat.custom.user.service.WebSocketService;
+import com.abin.mallchat.custom.user.service.WxMsgService;
+import io.netty.channel.Channel;
+import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+/**
+ * Description: 在本地服务上找寻对应channel,将对应用户登陆,并触发所有用户收到上线事件
+ * Author: abin
+ * Date: 2023-08-12
+ */
+@RocketMQMessageListener(consumerGroup = MQConstant.LOGIN_MSG_GROUP, topic = MQConstant.LOGIN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING)
+@Component
+public class MsgLoginConsumer implements RocketMQListener {
+ @Autowired
+ private WxMsgService wxMsgService;
+ @Autowired
+ private UserDao userDao;
+
+ @Override
+ public void onMessage(LoginMessageDTO loginMessageDTO) {
+ WxMpXmlMessage wxMpXmlMessage = loginMessageDTO.getWxMpXmlMessage();
+ //给二维码绑定的登录code
+ Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage));
+ //本地未储存对应的channel,则结束
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
+ if (Objects.isNull(channel)) {
+ return;
+ }
+ //查询openid对应的用户(必然存在)
+ String openid = wxMpXmlMessage.getFromUser();
+ User user = userDao.getByOpenId(openid);
+ //登录,并且清除缓存
+ wxMsgService.login(user.getId(), eventKey);
+ }
+
+ private String getEventKey(WxMpXmlMessage wxMpXmlMessage) {
+ //扫码关注的渠道事件有前缀,需要去除
+ return wxMpXmlMessage.getEventKey().replace("qrscene_", "");
+ }
+
+}
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java
new file mode 100644
index 0000000..3d22798
--- /dev/null
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java
@@ -0,0 +1,50 @@
+package com.abin.mallchat.custom.user.consumer;
+
+import cn.hutool.json.JSONUtil;
+import com.abin.mallchat.common.common.constant.MQConstant;
+import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
+import com.abin.mallchat.common.common.utils.CacheHolder;
+import com.abin.mallchat.common.user.dao.UserDao;
+import com.abin.mallchat.common.user.domain.entity.User;
+import com.abin.mallchat.custom.user.service.WebSocketService;
+import com.abin.mallchat.custom.user.service.WxMsgService;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+/**
+ * Description: 将扫码成功的信息发送给对应的用户,等待授权
+ * Author: abin
+ * Date: 2023-08-12
+ */
+@RocketMQMessageListener(consumerGroup = MQConstant.SCAN_MSG_GROUP, topic = MQConstant.SCAN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING)
+@Component
+public class ScanSuccessConsumer implements RocketMQListener {
+ @Autowired
+ private WebSocketService webSocketService;
+ @Autowired
+ private WxMsgService wxMsgService;
+ @Autowired
+ private UserDao userDao;
+
+ @Override
+ public void onMessage(ScanSuccessMessageDTO scanSuccessMessageDTO) {
+ Integer loginCode = scanSuccessMessageDTO.getLoginCode();
+ //本地未储存对应的channel,则结束
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
+ if (Objects.isNull(channel)) {
+ return;
+ }
+ //给正在等待登陆的channel发送扫码成功的消息,等待授权
+ channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(scanSuccessMessageDTO.getWsBaseMsg())));
+ }
+
+
+}
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java
index 2cb2259..7047ed7 100644
--- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java
@@ -2,6 +2,7 @@ package com.abin.mallchat.custom.user.service;
import com.abin.mallchat.common.user.domain.dto.ItemInfoDTO;
import com.abin.mallchat.common.user.domain.dto.SummeryInfoDTO;
+import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.custom.user.domain.vo.request.user.*;
import com.abin.mallchat.custom.user.domain.vo.response.user.BadgeResp;
import com.abin.mallchat.custom.user.domain.vo.response.user.UserInfoResp;
@@ -50,11 +51,11 @@ public interface UserService {
void wearingBadge(Long uid, WearingBadgeReq req);
/**
- * 用户注册
+ * 用户注册,需要获得id
*
- * @param openId
+ * @param user
*/
- void register(String openId);
+ void register(User user);
void black(BlackReq req);
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java
index b639cb0..6c93815 100644
--- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java
@@ -36,7 +36,7 @@ public interface WebSocketService {
void authorize(Channel channel, WSAuthorize wsAuthorize);
/**
- * 扫码用户登录成功通知
+ * 扫码用户登录成功通知,清除本地Cache中的loginCode和channel的关系
*
* @param loginCode
* @param user
@@ -45,11 +45,11 @@ public interface WebSocketService {
Boolean scanLoginSuccess(Integer loginCode, User user, String token);
/**
- * 用户扫码成功
+ * 通知用户扫码成功
*
* @param loginCode
*/
- Boolean scanSuccess(Integer loginCode);
+ Boolean scanSuccess(Integer loginCode, Long uid);
/**
* 推动消息给所有在线的人
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java
index 065c026..2557068 100644
--- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java
@@ -1,10 +1,17 @@
package com.abin.mallchat.custom.user.service;
import cn.hutool.core.util.RandomUtil;
+import com.abin.mallchat.common.common.constant.MQConstant;
+import com.abin.mallchat.common.common.constant.RedisKey;
+import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
+import com.abin.mallchat.common.common.utils.CacheHolder;
+import com.abin.mallchat.common.common.utils.RedisUtils;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.custom.user.service.adapter.TextBuilder;
import com.abin.mallchat.custom.user.service.adapter.UserAdapter;
+import com.abin.mallchat.transaction.service.MQProducer;
+import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
import me.chanjar.weixin.mp.api.WxMpService;
@@ -21,6 +28,7 @@ import org.springframework.stereotype.Service;
import java.net.URLEncoder;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
/**
* Description: 处理与微信api的交互逻辑
@@ -48,25 +56,37 @@ public class WxMsgService {
private UserService userService;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
+ @Autowired
+ private MQProducer mqProducer;
public WxMpXmlOutMessage scan(WxMpService wxMpService, WxMpXmlMessage wxMpXmlMessage) {
- String fromUser = wxMpXmlMessage.getFromUser();
- Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage));
- User user = userDao.getByOpenId(fromUser);
+ String openid = wxMpXmlMessage.getFromUser();
+ Integer loginCode = Integer.parseInt(getEventKey(wxMpXmlMessage));
+ User user = userDao.getByOpenId(openid);
+ //如果已经注册,直接登录成功
if (Objects.nonNull(user) && StringUtils.isNotEmpty(user.getAvatar())) {
- //注册且已经授权的用户,直接登录成功
- login(user.getId(), eventKey);
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
+ //要么在本地登录,否则利用mq广播到到所有服务上尝试登录
+ if (Objects.nonNull(channel)) {
+ String token = loginService.login(user.getId());
+ webSocketService.scanLoginSuccess(loginCode, user, token);
+ }else {
+ mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
+ }
return null;
}
+
+ //user为空先注册,手动生成,以保存uid
if (Objects.isNull(user)) {
- //未注册的先注册
- userService.register(fromUser);
+ user = User.builder().openId(openid).build();
+ userService.register(user);
}
- //保存openid和场景code的关系,后续才能通知到前端
- OPENID_EVENT_CODE_MAP.put(fromUser, eventKey);
- //授权流程,给用户发送授权消息,并且异步通知前端扫码成功
- threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(eventKey));
+ Long uid = user.getId();
+
+ //在 redis中保存openid和场景code的关系,后续才能通知到前端,旧版数据没有清除,这里设置了过期时间
+ RedisUtils.set(RedisKey.getKey(RedisKey.OPEN_ID_STRING, openid), loginCode, 60, TimeUnit.MINUTES);
+ //授权流程,给用户发送授权消息,并且异步通知前端扫码成功(如非本地channel,使用MQ通知某服务对前端进行通知扫码成功)
+ threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(loginCode, uid));
String skipUrl = String.format(URL, wxMpService.getWxMpConfigStorage().getAppId(), URLEncoder.encode(callback + "/wx/portal/public/callBack"));
WxMpXmlOutMessage.TEXT().build();
return new TextBuilder().build("请点击链接授权:登录", wxMpXmlMessage, wxMpService);
@@ -88,9 +108,20 @@ public class WxMsgService {
if (StringUtils.isEmpty(user.getName())) {
fillUserInfo(user.getId(), userInfo);
}
- //触发用户登录成功操作
- Integer eventKey = OPENID_EVENT_CODE_MAP.get(userInfo.getOpenid());
- login(user.getId(), eventKey);
+ //找到对应的
+ Integer eventKey = RedisUtils.get(RedisKey.getKey(RedisKey.OPEN_ID_STRING, userInfo.getOpenid()), Integer.class);
+ //如果channel就在本地,直接登录
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
+ if (Objects.nonNull(channel)) {
+ login(user.getId(), eventKey);
+ }else {
+ //如果channel不在本地,利用mq广播到到所有服务上,尝试进行登录
+ //手动生成一个WxMpXmlMessage
+ WxMpXmlMessage wxMpXmlMessage = new WxMpXmlMessage();
+ wxMpXmlMessage.setFromUser(userInfo.getOpenid());
+ wxMpXmlMessage.setEventKey("qrscene_"+eventKey);
+ mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
+ }
}
private void fillUserInfo(Long uid, WxOAuth2UserInfo userInfo) {
@@ -108,7 +139,7 @@ public class WxMsgService {
}
}
- private void login(Long uid, Integer eventKey) {
+ public void login(Long uid, Integer eventKey) {
User user = userDao.getById(uid);
//调用用户登录模块
String token = loginService.login(uid);
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java
index e8e5d62..1385f1a 100644
--- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java
@@ -120,10 +120,9 @@ public class UserServiceImpl implements UserService {
}
@Override
- public void register(String openId) {
- User insert = User.builder().openId(openId).build();
- userDao.save(insert);
- applicationEventPublisher.publishEvent(new UserRegisterEvent(this, insert));
+ public void register(User user) {
+ userDao.save(user);
+ applicationEventPublisher.publishEvent(new UserRegisterEvent(this, user));
}
@Override
diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java
index f85e5ec..2120b78 100644
--- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java
+++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java
@@ -5,8 +5,13 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.abin.mallchat.common.common.annotation.FrequencyControl;
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
+import com.abin.mallchat.common.common.constant.MQConstant;
+import com.abin.mallchat.common.common.constant.RedisKey;
+import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
import com.abin.mallchat.common.common.event.UserOfflineEvent;
import com.abin.mallchat.common.common.event.UserOnlineEvent;
+import com.abin.mallchat.common.common.utils.CacheHolder;
+import com.abin.mallchat.common.common.utils.RedisUtils;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.common.user.domain.enums.RoleEnum;
@@ -19,8 +24,7 @@ import com.abin.mallchat.custom.user.service.LoginService;
import com.abin.mallchat.custom.user.service.WebSocketService;
import com.abin.mallchat.custom.user.service.adapter.WSAdapter;
import com.abin.mallchat.custom.user.websocket.NettyUtil;
-import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
+import com.abin.mallchat.transaction.service.MQProducer;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.SneakyThrows;
@@ -37,7 +41,7 @@ import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,17 +55,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class WebSocketServiceImpl implements WebSocketService {
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
- private static final Long MAX_MUM_SIZE = 10000L;
- private static final AtomicInteger CODE = new AtomicInteger();
-
- /**
- * 所有请求登录的code与channel关系
- */
- private static final Cache WAIT_LOGIN_MAP = Caffeine.newBuilder()
- .expireAfterWrite(EXPIRE_TIME)
- .maximumSize(MAX_MUM_SIZE)
- .build();
/**
* 所有已连接的websocket连接列表和一些额外参数
*/
@@ -74,7 +68,10 @@ public class WebSocketServiceImpl implements WebSocketService {
public static ConcurrentHashMap getOnlineMap() {
return ONLINE_WS_MAP;
}
-
+ /**
+ * redis保存loginCode的key
+ */
+ private static final String LOGIN_CODE = "loginCode";
@Autowired
private WxMpService wxMpService;
@Autowired
@@ -90,6 +87,8 @@ public class WebSocketServiceImpl implements WebSocketService {
private UserCache userCache;
@Autowired
private IRoleService iRoleService;
+ @Autowired
+ private MQProducer mqProducer;
/**
* 处理用户登录请求,需要返回一张带code的二维码
@@ -100,11 +99,11 @@ public class WebSocketServiceImpl implements WebSocketService {
@Override
@FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
public void handleLoginReq(Channel channel) {
- //生成随机不重复的登录码
+ //生成随机不重复的登录码,并将channel存在本地cache中
Integer code = generateLoginCode(channel);
//请求微信接口,获取登录码地址
WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int) EXPIRE_TIME.getSeconds());
- //返回给前端
+ //返回给前端(channel必在本地)
sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
}
@@ -115,11 +114,14 @@ public class WebSocketServiceImpl implements WebSocketService {
* @return
*/
private Integer generateLoginCode(Channel channel) {
+ int inc = 0;
do {
- CODE.getAndIncrement();
- } while (WAIT_LOGIN_MAP.asMap().containsKey(CODE.get())
- || Objects.isNull(WAIT_LOGIN_MAP.get(CODE.get(), c -> channel)));
- return CODE.get();
+ //本地cache时间必须比redis key过期时间短,否则会出现并发问题
+ inc = RedisUtils.integerInc(RedisKey.getKey(LOGIN_CODE), 61, TimeUnit.MINUTES);
+ } while (CacheHolder.WAIT_LOGIN_MAP.asMap().containsKey(inc));
+ //储存一份在本地
+ CacheHolder.WAIT_LOGIN_MAP.put(inc, channel);
+ return inc;
}
/**
@@ -160,13 +162,14 @@ public class WebSocketServiceImpl implements WebSocketService {
}
/**
- * 登录成功,并更新状态
+ * (channel必在本地)登录成功,并更新状态
*/
private void loginSuccess(Channel channel, User user, String token) {
//更新上线列表
online(channel, user.getId());
//返回给用户登录成功
boolean hasPower = iRoleService.hasPower(user.getId(), RoleEnum.CHAT_MANAGER);
+ //发送给对应的用户
sendMsg(channel, WSAdapter.buildLoginSuccessResp(user, token, hasPower));
//发送用户上线事件
boolean online = userCache.isOnline(user.getId());
@@ -206,25 +209,28 @@ public class WebSocketServiceImpl implements WebSocketService {
@Override
public Boolean scanLoginSuccess(Integer loginCode, User user, String token) {
//发送消息
- Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.isNull(channel)) {
return Boolean.FALSE;
}
//移除code
- WAIT_LOGIN_MAP.invalidate(loginCode);
+ CacheHolder.WAIT_LOGIN_MAP.invalidate(loginCode);
//用户登录
loginSuccess(channel, user, token);
return true;
}
@Override
- public Boolean scanSuccess(Integer loginCode) {
- Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
- if (Objects.isNull(channel)) {
+ public Boolean scanSuccess(Integer loginCode, Long uid) {
+ Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
+ if (Objects.nonNull(channel)) {
+ sendMsg(channel, WSAdapter.buildScanSuccessResp());
+ return Boolean.TRUE;
+ }else {
+ //广播通知次channel服务扫码成功
+ mqProducer.sendMsg(MQConstant.SCAN_MSG_TOPIC, new ScanSuccessMessageDTO(loginCode, WSAdapter.buildScanSuccessResp()));
return Boolean.FALSE;
}
- sendMsg(channel, WSAdapter.buildScanSuccessResp());
- return true;
}
@@ -267,10 +273,15 @@ public class WebSocketServiceImpl implements WebSocketService {
channels.forEach(channel -> {
threadPoolTaskExecutor.execute(() -> sendMsg(channel, wsBaseResp));
});
-
}
+ /**
+ * 给本地channel发送消息
+ *
+ * @param channel
+ * @param wsBaseResp
+ */
private void sendMsg(Channel channel, WSBaseResp> wsBaseResp) {
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
}