From 1c69600b836013363ab317571ded1947ed2799d7 Mon Sep 17 00:00:00 2001 From: "273817133@qq.com" <273817133@qq.com> Date: Sat, 19 Aug 2023 02:11:19 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E5=BE=AE=E4=BF=A1=E6=89=AB?= =?UTF-8?q?=E7=A0=81=E5=9B=9E=E8=B0=83=E5=92=8C=E5=BE=AE=E4=BF=A1=E6=8E=88?= =?UTF-8?q?=E6=9D=83=E5=9B=9E=E8=B0=83=E6=97=B6=EF=BC=8Crequest=E5=92=8Cch?= =?UTF-8?q?annel=E5=9C=A8=E4=B8=8D=E5=90=8C=E6=9C=8D=E5=8A=A1=E5=99=A8?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98:=E4=BD=BF=E7=94=A8mq=E5=B0=86?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=88=A9=E7=94=A8=E5=B9=BF=E6=92=AD=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E8=AE=A9=E5=85=B6=E4=BB=96=E6=9C=8D=E5=8A=A1=E5=8E=BB?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/common/constant/MQConstant.java | 12 ++++ .../common/common/constant/RedisKey.java | 6 ++ .../common/domain/dto/LoginMessageDTO.java | 32 +++++++++ .../domain/dto/ScanSuccessMessageDTO.java | 33 +++++++++ .../common/common/utils/CacheHolder.java | 26 +++++++ .../common/common/utils/RedisUtils.java | 20 +++++- .../user/consumer/MsgLoginConsumer.java | 56 +++++++++++++++ .../user/consumer/ScanSuccessConsumer.java | 50 ++++++++++++++ .../custom/user/service/UserService.java | 7 +- .../custom/user/service/WebSocketService.java | 6 +- .../custom/user/service/WxMsgService.java | 63 ++++++++++++----- .../user/service/impl/UserServiceImpl.java | 7 +- .../service/impl/WebSocketServiceImpl.java | 69 +++++++++++-------- 13 files changed, 330 insertions(+), 57 deletions(-) create mode 100644 mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java create mode 100644 mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java create mode 100644 mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java create mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java create mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java index 5403bcf..aa877eb 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/MQConstant.java @@ -16,4 +16,16 @@ public interface MQConstant { */ String PUSH_TOPIC = "websocket_push"; String PUSH_GROUP = "websocket_push_group"; + + /** + * (授权完成后)登录信息mq + */ + String LOGIN_MSG_TOPIC = "login_send_msg"; + String LOGIN_MSG_GROUP = "login_send_msg_group"; + + /** + * 扫码成功 信息发送mq + */ + String SCAN_MSG_TOPIC = "scan_send_msg"; + String SCAN_MSG_GROUP = "scan_send_msg_group"; } diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java index 38ee20d..ce3bf8b 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/constant/RedisKey.java @@ -58,6 +58,12 @@ public class RedisKey { public static final String USER_CHAT_CONTEXT = "useChatGPTContext:uid_%d_roomId_%d"; + /** + * 保存Open id + */ + public static final String OPEN_ID_STRING = "openid:%s"; + + /** * 用户上次使用GLM使用时间 */ diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java new file mode 100644 index 0000000..f250c0a --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/LoginMessageDTO.java @@ -0,0 +1,32 @@ +package com.abin.mallchat.common.common.domain.dto; + +import com.abin.mallchat.common.user.domain.enums.WSBaseResp; +import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage; + +import java.io.Serializable; + +/** + * Description: 将扫码登录返回信息推送给所有横向扩展的服务 + * Author: zjy + * Date: 2023-08-12 + */ +@Data +@NoArgsConstructor +public class LoginMessageDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 微信公众号获得扫码事件后,发送给我方的回调信息 + */ + private WxMpXmlMessage wxMpXmlMessage ; + + public LoginMessageDTO(WxMpXmlMessage wxMpXmlMessage) { + this.wxMpXmlMessage = wxMpXmlMessage; + } + +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java new file mode 100644 index 0000000..7472181 --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/domain/dto/ScanSuccessMessageDTO.java @@ -0,0 +1,33 @@ +package com.abin.mallchat.common.common.domain.dto; + +import com.abin.mallchat.common.user.domain.enums.WSBaseResp; +import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * Description: 扫码成功对象,推送给用户的消息对象 + * Author: abin + * Date: 2023-08-12 + */ +@Data +@AllArgsConstructor +@NoArgsConstructor +public class ScanSuccessMessageDTO implements Serializable { + /** + * 推送的ws消息 + */ + private WSBaseResp wsBaseMsg; + /** + * 推送的uid + */ + private Integer loginCode; + + public ScanSuccessMessageDTO(Integer loginCode, WSBaseResp wsBaseMsg) { + this.loginCode = loginCode; + this.wsBaseMsg = wsBaseMsg; + } +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java new file mode 100644 index 0000000..8fc4f0c --- /dev/null +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/CacheHolder.java @@ -0,0 +1,26 @@ +package com.abin.mallchat.common.common.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import io.netty.channel.Channel; + +import java.time.Duration; + +/** + * Description: Cache管理器 + * Author: abin + * Date: 2023-04-05 + */ +public class CacheHolder { + + private static final Long MAX_MUM_SIZE = 10000L; + + private static final Duration EXPIRE_TIME = Duration.ofHours(1); + /** + * 所有请求登录的code与channel关系 + */ + public static final Cache WAIT_LOGIN_MAP = Caffeine.newBuilder() + .expireAfterWrite(EXPIRE_TIME) + .maximumSize(MAX_MUM_SIZE) + .build(); +} diff --git a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java index 9565336..d6f29e7 100644 --- a/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java +++ b/mallchat-common/src/main/java/com/abin/mallchat/common/common/utils/RedisUtils.java @@ -39,6 +39,22 @@ public class RedisUtils { } /** + * 自增int + * + * @param key 键 + * @param time 时间(秒) + */ + public static Integer integerInc(String key, int time, TimeUnit unit) { + RedisScript redisScript = new DefaultRedisScript<>(LUA_INCR_EXPIRE, Long.class); + Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(unit.toSeconds(time))); + try{ + return Integer.parseInt(result.toString()); + }catch (Exception e) { + RedisUtils.del(key); + throw e; + } + } + /** * 指定缓存失效时间 * * @param key 键 @@ -862,8 +878,8 @@ public class RedisUtils { * @param end * @return */ - public static Set> zRangeWithScores(String key, long start, - long end) { + public static Set> zRangeWithScores(String key, long start, + long end) { return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end); } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java new file mode 100644 index 0000000..43b88e6 --- /dev/null +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/MsgLoginConsumer.java @@ -0,0 +1,56 @@ +package com.abin.mallchat.custom.user.consumer; + +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.constant.RedisKey; +import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO; +import com.abin.mallchat.common.common.utils.CacheHolder; +import com.abin.mallchat.common.user.dao.UserDao; +import com.abin.mallchat.common.user.domain.entity.User; +import com.abin.mallchat.custom.user.service.WebSocketService; +import com.abin.mallchat.custom.user.service.WxMsgService; +import io.netty.channel.Channel; +import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * Description: 在本地服务上找寻对应channel,将对应用户登陆,并触发所有用户收到上线事件 + * Author: abin + * Date: 2023-08-12 + */ +@RocketMQMessageListener(consumerGroup = MQConstant.LOGIN_MSG_GROUP, topic = MQConstant.LOGIN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING) +@Component +public class MsgLoginConsumer implements RocketMQListener { + @Autowired + private WxMsgService wxMsgService; + @Autowired + private UserDao userDao; + + @Override + public void onMessage(LoginMessageDTO loginMessageDTO) { + WxMpXmlMessage wxMpXmlMessage = loginMessageDTO.getWxMpXmlMessage(); + //给二维码绑定的登录code + Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage)); + //本地未储存对应的channel,则结束 + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey); + if (Objects.isNull(channel)) { + return; + } + //查询openid对应的用户(必然存在) + String openid = wxMpXmlMessage.getFromUser(); + User user = userDao.getByOpenId(openid); + //登录,并且清除缓存 + wxMsgService.login(user.getId(), eventKey); + } + + private String getEventKey(WxMpXmlMessage wxMpXmlMessage) { + //扫码关注的渠道事件有前缀,需要去除 + return wxMpXmlMessage.getEventKey().replace("qrscene_", ""); + } + +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java new file mode 100644 index 0000000..3d22798 --- /dev/null +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/consumer/ScanSuccessConsumer.java @@ -0,0 +1,50 @@ +package com.abin.mallchat.custom.user.consumer; + +import cn.hutool.json.JSONUtil; +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO; +import com.abin.mallchat.common.common.utils.CacheHolder; +import com.abin.mallchat.common.user.dao.UserDao; +import com.abin.mallchat.common.user.domain.entity.User; +import com.abin.mallchat.custom.user.service.WebSocketService; +import com.abin.mallchat.custom.user.service.WxMsgService; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage; +import org.apache.rocketmq.spring.annotation.MessageModel; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.Objects; + +/** + * Description: 将扫码成功的信息发送给对应的用户,等待授权 + * Author: abin + * Date: 2023-08-12 + */ +@RocketMQMessageListener(consumerGroup = MQConstant.SCAN_MSG_GROUP, topic = MQConstant.SCAN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING) +@Component +public class ScanSuccessConsumer implements RocketMQListener { + @Autowired + private WebSocketService webSocketService; + @Autowired + private WxMsgService wxMsgService; + @Autowired + private UserDao userDao; + + @Override + public void onMessage(ScanSuccessMessageDTO scanSuccessMessageDTO) { + Integer loginCode = scanSuccessMessageDTO.getLoginCode(); + //本地未储存对应的channel,则结束 + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode); + if (Objects.isNull(channel)) { + return; + } + //给正在等待登陆的channel发送扫码成功的消息,等待授权 + channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(scanSuccessMessageDTO.getWsBaseMsg()))); + } + + +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java index 2cb2259..7047ed7 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/UserService.java @@ -2,6 +2,7 @@ package com.abin.mallchat.custom.user.service; import com.abin.mallchat.common.user.domain.dto.ItemInfoDTO; import com.abin.mallchat.common.user.domain.dto.SummeryInfoDTO; +import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.custom.user.domain.vo.request.user.*; import com.abin.mallchat.custom.user.domain.vo.response.user.BadgeResp; import com.abin.mallchat.custom.user.domain.vo.response.user.UserInfoResp; @@ -50,11 +51,11 @@ public interface UserService { void wearingBadge(Long uid, WearingBadgeReq req); /** - * 用户注册 + * 用户注册,需要获得id * - * @param openId + * @param user */ - void register(String openId); + void register(User user); void black(BlackReq req); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java index b639cb0..6c93815 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WebSocketService.java @@ -36,7 +36,7 @@ public interface WebSocketService { void authorize(Channel channel, WSAuthorize wsAuthorize); /** - * 扫码用户登录成功通知 + * 扫码用户登录成功通知,清除本地Cache中的loginCode和channel的关系 * * @param loginCode * @param user @@ -45,11 +45,11 @@ public interface WebSocketService { Boolean scanLoginSuccess(Integer loginCode, User user, String token); /** - * 用户扫码成功 + * 通知用户扫码成功 * * @param loginCode */ - Boolean scanSuccess(Integer loginCode); + Boolean scanSuccess(Integer loginCode, Long uid); /** * 推动消息给所有在线的人 diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java index 065c026..2557068 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/WxMsgService.java @@ -1,10 +1,17 @@ package com.abin.mallchat.custom.user.service; import cn.hutool.core.util.RandomUtil; +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.constant.RedisKey; +import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO; +import com.abin.mallchat.common.common.utils.CacheHolder; +import com.abin.mallchat.common.common.utils.RedisUtils; import com.abin.mallchat.common.user.dao.UserDao; import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.custom.user.service.adapter.TextBuilder; import com.abin.mallchat.custom.user.service.adapter.UserAdapter; +import com.abin.mallchat.transaction.service.MQProducer; +import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import me.chanjar.weixin.common.bean.WxOAuth2UserInfo; import me.chanjar.weixin.mp.api.WxMpService; @@ -21,6 +28,7 @@ import org.springframework.stereotype.Service; import java.net.URLEncoder; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * Description: 处理与微信api的交互逻辑 @@ -48,25 +56,37 @@ public class WxMsgService { private UserService userService; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; - + @Autowired + private MQProducer mqProducer; public WxMpXmlOutMessage scan(WxMpService wxMpService, WxMpXmlMessage wxMpXmlMessage) { - String fromUser = wxMpXmlMessage.getFromUser(); - Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage)); - User user = userDao.getByOpenId(fromUser); + String openid = wxMpXmlMessage.getFromUser(); + Integer loginCode = Integer.parseInt(getEventKey(wxMpXmlMessage)); + User user = userDao.getByOpenId(openid); + //如果已经注册,直接登录成功 if (Objects.nonNull(user) && StringUtils.isNotEmpty(user.getAvatar())) { - //注册且已经授权的用户,直接登录成功 - login(user.getId(), eventKey); + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode); + //要么在本地登录,否则利用mq广播到到所有服务上尝试登录 + if (Objects.nonNull(channel)) { + String token = loginService.login(user.getId()); + webSocketService.scanLoginSuccess(loginCode, user, token); + }else { + mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage)); + } return null; } + + //user为空先注册,手动生成,以保存uid if (Objects.isNull(user)) { - //未注册的先注册 - userService.register(fromUser); + user = User.builder().openId(openid).build(); + userService.register(user); } - //保存openid和场景code的关系,后续才能通知到前端 - OPENID_EVENT_CODE_MAP.put(fromUser, eventKey); - //授权流程,给用户发送授权消息,并且异步通知前端扫码成功 - threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(eventKey)); + Long uid = user.getId(); + + //在 redis中保存openid和场景code的关系,后续才能通知到前端,旧版数据没有清除,这里设置了过期时间 + RedisUtils.set(RedisKey.getKey(RedisKey.OPEN_ID_STRING, openid), loginCode, 60, TimeUnit.MINUTES); + //授权流程,给用户发送授权消息,并且异步通知前端扫码成功(如非本地channel,使用MQ通知某服务对前端进行通知扫码成功) + threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(loginCode, uid)); String skipUrl = String.format(URL, wxMpService.getWxMpConfigStorage().getAppId(), URLEncoder.encode(callback + "/wx/portal/public/callBack")); WxMpXmlOutMessage.TEXT().build(); return new TextBuilder().build("请点击链接授权:登录", wxMpXmlMessage, wxMpService); @@ -88,9 +108,20 @@ public class WxMsgService { if (StringUtils.isEmpty(user.getName())) { fillUserInfo(user.getId(), userInfo); } - //触发用户登录成功操作 - Integer eventKey = OPENID_EVENT_CODE_MAP.get(userInfo.getOpenid()); - login(user.getId(), eventKey); + //找到对应的 + Integer eventKey = RedisUtils.get(RedisKey.getKey(RedisKey.OPEN_ID_STRING, userInfo.getOpenid()), Integer.class); + //如果channel就在本地,直接登录 + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey); + if (Objects.nonNull(channel)) { + login(user.getId(), eventKey); + }else { + //如果channel不在本地,利用mq广播到到所有服务上,尝试进行登录 + //手动生成一个WxMpXmlMessage + WxMpXmlMessage wxMpXmlMessage = new WxMpXmlMessage(); + wxMpXmlMessage.setFromUser(userInfo.getOpenid()); + wxMpXmlMessage.setEventKey("qrscene_"+eventKey); + mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage)); + } } private void fillUserInfo(Long uid, WxOAuth2UserInfo userInfo) { @@ -108,7 +139,7 @@ public class WxMsgService { } } - private void login(Long uid, Integer eventKey) { + public void login(Long uid, Integer eventKey) { User user = userDao.getById(uid); //调用用户登录模块 String token = loginService.login(uid); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java index e8e5d62..1385f1a 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/UserServiceImpl.java @@ -120,10 +120,9 @@ public class UserServiceImpl implements UserService { } @Override - public void register(String openId) { - User insert = User.builder().openId(openId).build(); - userDao.save(insert); - applicationEventPublisher.publishEvent(new UserRegisterEvent(this, insert)); + public void register(User user) { + userDao.save(user); + applicationEventPublisher.publishEvent(new UserRegisterEvent(this, user)); } @Override diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index f85e5ec..2120b78 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -5,8 +5,13 @@ import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONUtil; import com.abin.mallchat.common.common.annotation.FrequencyControl; import com.abin.mallchat.common.common.config.ThreadPoolConfig; +import com.abin.mallchat.common.common.constant.MQConstant; +import com.abin.mallchat.common.common.constant.RedisKey; +import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO; import com.abin.mallchat.common.common.event.UserOfflineEvent; import com.abin.mallchat.common.common.event.UserOnlineEvent; +import com.abin.mallchat.common.common.utils.CacheHolder; +import com.abin.mallchat.common.common.utils.RedisUtils; import com.abin.mallchat.common.user.dao.UserDao; import com.abin.mallchat.common.user.domain.entity.User; import com.abin.mallchat.common.user.domain.enums.RoleEnum; @@ -19,8 +24,7 @@ import com.abin.mallchat.custom.user.service.LoginService; import com.abin.mallchat.custom.user.service.WebSocketService; import com.abin.mallchat.custom.user.service.adapter.WSAdapter; import com.abin.mallchat.custom.user.websocket.NettyUtil; -import com.github.benmanes.caffeine.cache.Cache; -import com.github.benmanes.caffeine.cache.Caffeine; +import com.abin.mallchat.transaction.service.MQProducer; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.SneakyThrows; @@ -37,7 +41,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -51,17 +55,7 @@ import java.util.concurrent.locks.ReentrantLock; public class WebSocketServiceImpl implements WebSocketService { private static final Duration EXPIRE_TIME = Duration.ofHours(1); - private static final Long MAX_MUM_SIZE = 10000L; - private static final AtomicInteger CODE = new AtomicInteger(); - - /** - * 所有请求登录的code与channel关系 - */ - private static final Cache WAIT_LOGIN_MAP = Caffeine.newBuilder() - .expireAfterWrite(EXPIRE_TIME) - .maximumSize(MAX_MUM_SIZE) - .build(); /** * 所有已连接的websocket连接列表和一些额外参数 */ @@ -74,7 +68,10 @@ public class WebSocketServiceImpl implements WebSocketService { public static ConcurrentHashMap getOnlineMap() { return ONLINE_WS_MAP; } - + /** + * redis保存loginCode的key + */ + private static final String LOGIN_CODE = "loginCode"; @Autowired private WxMpService wxMpService; @Autowired @@ -90,6 +87,8 @@ public class WebSocketServiceImpl implements WebSocketService { private UserCache userCache; @Autowired private IRoleService iRoleService; + @Autowired + private MQProducer mqProducer; /** * 处理用户登录请求,需要返回一张带code的二维码 @@ -100,11 +99,11 @@ public class WebSocketServiceImpl implements WebSocketService { @Override @FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()") public void handleLoginReq(Channel channel) { - //生成随机不重复的登录码 + //生成随机不重复的登录码,并将channel存在本地cache中 Integer code = generateLoginCode(channel); //请求微信接口,获取登录码地址 WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int) EXPIRE_TIME.getSeconds()); - //返回给前端 + //返回给前端(channel必在本地) sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket)); } @@ -115,11 +114,14 @@ public class WebSocketServiceImpl implements WebSocketService { * @return */ private Integer generateLoginCode(Channel channel) { + int inc = 0; do { - CODE.getAndIncrement(); - } while (WAIT_LOGIN_MAP.asMap().containsKey(CODE.get()) - || Objects.isNull(WAIT_LOGIN_MAP.get(CODE.get(), c -> channel))); - return CODE.get(); + //本地cache时间必须比redis key过期时间短,否则会出现并发问题 + inc = RedisUtils.integerInc(RedisKey.getKey(LOGIN_CODE), 61, TimeUnit.MINUTES); + } while (CacheHolder.WAIT_LOGIN_MAP.asMap().containsKey(inc)); + //储存一份在本地 + CacheHolder.WAIT_LOGIN_MAP.put(inc, channel); + return inc; } /** @@ -160,13 +162,14 @@ public class WebSocketServiceImpl implements WebSocketService { } /** - * 登录成功,并更新状态 + * (channel必在本地)登录成功,并更新状态 */ private void loginSuccess(Channel channel, User user, String token) { //更新上线列表 online(channel, user.getId()); //返回给用户登录成功 boolean hasPower = iRoleService.hasPower(user.getId(), RoleEnum.CHAT_MANAGER); + //发送给对应的用户 sendMsg(channel, WSAdapter.buildLoginSuccessResp(user, token, hasPower)); //发送用户上线事件 boolean online = userCache.isOnline(user.getId()); @@ -206,25 +209,28 @@ public class WebSocketServiceImpl implements WebSocketService { @Override public Boolean scanLoginSuccess(Integer loginCode, User user, String token) { //发送消息 - Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode); + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode); if (Objects.isNull(channel)) { return Boolean.FALSE; } //移除code - WAIT_LOGIN_MAP.invalidate(loginCode); + CacheHolder.WAIT_LOGIN_MAP.invalidate(loginCode); //用户登录 loginSuccess(channel, user, token); return true; } @Override - public Boolean scanSuccess(Integer loginCode) { - Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode); - if (Objects.isNull(channel)) { + public Boolean scanSuccess(Integer loginCode, Long uid) { + Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode); + if (Objects.nonNull(channel)) { + sendMsg(channel, WSAdapter.buildScanSuccessResp()); + return Boolean.TRUE; + }else { + //广播通知次channel服务扫码成功 + mqProducer.sendMsg(MQConstant.SCAN_MSG_TOPIC, new ScanSuccessMessageDTO(loginCode, WSAdapter.buildScanSuccessResp())); return Boolean.FALSE; } - sendMsg(channel, WSAdapter.buildScanSuccessResp()); - return true; } @@ -267,10 +273,15 @@ public class WebSocketServiceImpl implements WebSocketService { channels.forEach(channel -> { threadPoolTaskExecutor.execute(() -> sendMsg(channel, wsBaseResp)); }); - } + /** + * 给本地channel发送消息 + * + * @param channel + * @param wsBaseResp + */ private void sendMsg(Channel channel, WSBaseResp wsBaseResp) { channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp))); }