mirror of
https://github.com/zongzibinbin/MallChat.git
synced 2026-03-24 23:23:46 +08:00
解决微信扫码回调和微信授权回调时,request和channel在不同服务器的问题:使用mq将消息利用广播模式让其他服务去处理
This commit is contained in:
@@ -16,4 +16,16 @@ public interface MQConstant {
|
|||||||
*/
|
*/
|
||||||
String PUSH_TOPIC = "websocket_push";
|
String PUSH_TOPIC = "websocket_push";
|
||||||
String PUSH_GROUP = "websocket_push_group";
|
String PUSH_GROUP = "websocket_push_group";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* (授权完成后)登录信息mq
|
||||||
|
*/
|
||||||
|
String LOGIN_MSG_TOPIC = "login_send_msg";
|
||||||
|
String LOGIN_MSG_GROUP = "login_send_msg_group";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 扫码成功 信息发送mq
|
||||||
|
*/
|
||||||
|
String SCAN_MSG_TOPIC = "scan_send_msg";
|
||||||
|
String SCAN_MSG_GROUP = "scan_send_msg_group";
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,6 +58,12 @@ public class RedisKey {
|
|||||||
|
|
||||||
public static final String USER_CHAT_CONTEXT = "useChatGPTContext:uid_%d_roomId_%d";
|
public static final String USER_CHAT_CONTEXT = "useChatGPTContext:uid_%d_roomId_%d";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 保存Open id
|
||||||
|
*/
|
||||||
|
public static final String OPEN_ID_STRING = "openid:%s";
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户上次使用GLM使用时间
|
* 用户上次使用GLM使用时间
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -0,0 +1,32 @@
|
|||||||
|
package com.abin.mallchat.common.common.domain.dto;
|
||||||
|
|
||||||
|
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
|
||||||
|
import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description: 将扫码登录返回信息推送给所有横向扩展的服务
|
||||||
|
* Author: zjy
|
||||||
|
* Date: 2023-08-12
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class LoginMessageDTO implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 微信公众号获得扫码事件后,发送给我方的回调信息
|
||||||
|
*/
|
||||||
|
private WxMpXmlMessage wxMpXmlMessage ;
|
||||||
|
|
||||||
|
public LoginMessageDTO(WxMpXmlMessage wxMpXmlMessage) {
|
||||||
|
this.wxMpXmlMessage = wxMpXmlMessage;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,33 @@
|
|||||||
|
package com.abin.mallchat.common.common.domain.dto;
|
||||||
|
|
||||||
|
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
|
||||||
|
import com.abin.mallchat.common.user.domain.enums.WSPushTypeEnum;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description: 扫码成功对象,推送给用户的消息对象
|
||||||
|
* Author: <a href="https://github.com/zongzibinbin">abin</a>
|
||||||
|
* Date: 2023-08-12
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class ScanSuccessMessageDTO implements Serializable {
|
||||||
|
/**
|
||||||
|
* 推送的ws消息
|
||||||
|
*/
|
||||||
|
private WSBaseResp<?> wsBaseMsg;
|
||||||
|
/**
|
||||||
|
* 推送的uid
|
||||||
|
*/
|
||||||
|
private Integer loginCode;
|
||||||
|
|
||||||
|
public ScanSuccessMessageDTO(Integer loginCode, WSBaseResp<?> wsBaseMsg) {
|
||||||
|
this.loginCode = loginCode;
|
||||||
|
this.wsBaseMsg = wsBaseMsg;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
package com.abin.mallchat.common.common.utils;
|
||||||
|
|
||||||
|
import com.github.benmanes.caffeine.cache.Cache;
|
||||||
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description: Cache管理器
|
||||||
|
* Author: <a href="https://github.com/zongzibinbin">abin</a>
|
||||||
|
* Date: 2023-04-05
|
||||||
|
*/
|
||||||
|
public class CacheHolder {
|
||||||
|
|
||||||
|
private static final Long MAX_MUM_SIZE = 10000L;
|
||||||
|
|
||||||
|
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
|
||||||
|
/**
|
||||||
|
* 所有请求登录的code与channel关系
|
||||||
|
*/
|
||||||
|
public static final Cache<Integer, Channel> WAIT_LOGIN_MAP = Caffeine.newBuilder()
|
||||||
|
.expireAfterWrite(EXPIRE_TIME)
|
||||||
|
.maximumSize(MAX_MUM_SIZE)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
@@ -39,6 +39,22 @@ public class RedisUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* 自增int
|
||||||
|
*
|
||||||
|
* @param key 键
|
||||||
|
* @param time 时间(秒)
|
||||||
|
*/
|
||||||
|
public static Integer integerInc(String key, int time, TimeUnit unit) {
|
||||||
|
RedisScript<Long> redisScript = new DefaultRedisScript<>(LUA_INCR_EXPIRE, Long.class);
|
||||||
|
Long result = stringRedisTemplate.execute(redisScript, Collections.singletonList(key), String.valueOf(unit.toSeconds(time)));
|
||||||
|
try{
|
||||||
|
return Integer.parseInt(result.toString());
|
||||||
|
}catch (Exception e) {
|
||||||
|
RedisUtils.del(key);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/**
|
||||||
* 指定缓存失效时间
|
* 指定缓存失效时间
|
||||||
*
|
*
|
||||||
* @param key 键
|
* @param key 键
|
||||||
@@ -862,8 +878,8 @@ public class RedisUtils {
|
|||||||
* @param end
|
* @param end
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static Set<ZSetOperations.TypedTuple<String>> zRangeWithScores(String key, long start,
|
public static Set<TypedTuple<String>> zRangeWithScores(String key, long start,
|
||||||
long end) {
|
long end) {
|
||||||
return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
|
return stringRedisTemplate.opsForZSet().rangeWithScores(key, start, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package com.abin.mallchat.custom.user.consumer;
|
||||||
|
|
||||||
|
import com.abin.mallchat.common.common.constant.MQConstant;
|
||||||
|
import com.abin.mallchat.common.common.constant.RedisKey;
|
||||||
|
import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
|
||||||
|
import com.abin.mallchat.common.common.utils.CacheHolder;
|
||||||
|
import com.abin.mallchat.common.user.dao.UserDao;
|
||||||
|
import com.abin.mallchat.common.user.domain.entity.User;
|
||||||
|
import com.abin.mallchat.custom.user.service.WebSocketService;
|
||||||
|
import com.abin.mallchat.custom.user.service.WxMsgService;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
|
||||||
|
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description: 在本地服务上找寻对应channel,将对应用户登陆,并触发所有用户收到上线事件
|
||||||
|
* Author: <a href="https://github.com/zongzibinbin">abin</a>
|
||||||
|
* Date: 2023-08-12
|
||||||
|
*/
|
||||||
|
@RocketMQMessageListener(consumerGroup = MQConstant.LOGIN_MSG_GROUP, topic = MQConstant.LOGIN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING)
|
||||||
|
@Component
|
||||||
|
public class MsgLoginConsumer implements RocketMQListener<LoginMessageDTO> {
|
||||||
|
@Autowired
|
||||||
|
private WxMsgService wxMsgService;
|
||||||
|
@Autowired
|
||||||
|
private UserDao userDao;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(LoginMessageDTO loginMessageDTO) {
|
||||||
|
WxMpXmlMessage wxMpXmlMessage = loginMessageDTO.getWxMpXmlMessage();
|
||||||
|
//给二维码绑定的登录code
|
||||||
|
Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage));
|
||||||
|
//本地未储存对应的channel,则结束
|
||||||
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//查询openid对应的用户(必然存在)
|
||||||
|
String openid = wxMpXmlMessage.getFromUser();
|
||||||
|
User user = userDao.getByOpenId(openid);
|
||||||
|
//登录,并且清除缓存
|
||||||
|
wxMsgService.login(user.getId(), eventKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getEventKey(WxMpXmlMessage wxMpXmlMessage) {
|
||||||
|
//扫码关注的渠道事件有前缀,需要去除
|
||||||
|
return wxMpXmlMessage.getEventKey().replace("qrscene_", "");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,50 @@
|
|||||||
|
package com.abin.mallchat.custom.user.consumer;
|
||||||
|
|
||||||
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import com.abin.mallchat.common.common.constant.MQConstant;
|
||||||
|
import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
|
||||||
|
import com.abin.mallchat.common.common.utils.CacheHolder;
|
||||||
|
import com.abin.mallchat.common.user.dao.UserDao;
|
||||||
|
import com.abin.mallchat.common.user.domain.entity.User;
|
||||||
|
import com.abin.mallchat.custom.user.service.WebSocketService;
|
||||||
|
import com.abin.mallchat.custom.user.service.WxMsgService;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||||
|
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
|
||||||
|
import org.apache.rocketmq.spring.annotation.MessageModel;
|
||||||
|
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||||
|
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Description: 将扫码成功的信息发送给对应的用户,等待授权
|
||||||
|
* Author: <a href="https://github.com/zongzibinbin">abin</a>
|
||||||
|
* Date: 2023-08-12
|
||||||
|
*/
|
||||||
|
@RocketMQMessageListener(consumerGroup = MQConstant.SCAN_MSG_GROUP, topic = MQConstant.SCAN_MSG_TOPIC, messageModel = MessageModel.BROADCASTING)
|
||||||
|
@Component
|
||||||
|
public class ScanSuccessConsumer implements RocketMQListener<ScanSuccessMessageDTO> {
|
||||||
|
@Autowired
|
||||||
|
private WebSocketService webSocketService;
|
||||||
|
@Autowired
|
||||||
|
private WxMsgService wxMsgService;
|
||||||
|
@Autowired
|
||||||
|
private UserDao userDao;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(ScanSuccessMessageDTO scanSuccessMessageDTO) {
|
||||||
|
Integer loginCode = scanSuccessMessageDTO.getLoginCode();
|
||||||
|
//本地未储存对应的channel,则结束
|
||||||
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
||||||
|
if (Objects.isNull(channel)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
//给正在等待登陆的channel发送扫码成功的消息,等待授权
|
||||||
|
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(scanSuccessMessageDTO.getWsBaseMsg())));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@ package com.abin.mallchat.custom.user.service;
|
|||||||
|
|
||||||
import com.abin.mallchat.common.user.domain.dto.ItemInfoDTO;
|
import com.abin.mallchat.common.user.domain.dto.ItemInfoDTO;
|
||||||
import com.abin.mallchat.common.user.domain.dto.SummeryInfoDTO;
|
import com.abin.mallchat.common.user.domain.dto.SummeryInfoDTO;
|
||||||
|
import com.abin.mallchat.common.user.domain.entity.User;
|
||||||
import com.abin.mallchat.custom.user.domain.vo.request.user.*;
|
import com.abin.mallchat.custom.user.domain.vo.request.user.*;
|
||||||
import com.abin.mallchat.custom.user.domain.vo.response.user.BadgeResp;
|
import com.abin.mallchat.custom.user.domain.vo.response.user.BadgeResp;
|
||||||
import com.abin.mallchat.custom.user.domain.vo.response.user.UserInfoResp;
|
import com.abin.mallchat.custom.user.domain.vo.response.user.UserInfoResp;
|
||||||
@@ -50,11 +51,11 @@ public interface UserService {
|
|||||||
void wearingBadge(Long uid, WearingBadgeReq req);
|
void wearingBadge(Long uid, WearingBadgeReq req);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户注册
|
* 用户注册,需要获得id
|
||||||
*
|
*
|
||||||
* @param openId
|
* @param user
|
||||||
*/
|
*/
|
||||||
void register(String openId);
|
void register(User user);
|
||||||
|
|
||||||
void black(BlackReq req);
|
void black(BlackReq req);
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ public interface WebSocketService {
|
|||||||
void authorize(Channel channel, WSAuthorize wsAuthorize);
|
void authorize(Channel channel, WSAuthorize wsAuthorize);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 扫码用户登录成功通知
|
* 扫码用户登录成功通知,清除本地Cache中的loginCode和channel的关系
|
||||||
*
|
*
|
||||||
* @param loginCode
|
* @param loginCode
|
||||||
* @param user
|
* @param user
|
||||||
@@ -45,11 +45,11 @@ public interface WebSocketService {
|
|||||||
Boolean scanLoginSuccess(Integer loginCode, User user, String token);
|
Boolean scanLoginSuccess(Integer loginCode, User user, String token);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 用户扫码成功
|
* 通知用户扫码成功
|
||||||
*
|
*
|
||||||
* @param loginCode
|
* @param loginCode
|
||||||
*/
|
*/
|
||||||
Boolean scanSuccess(Integer loginCode);
|
Boolean scanSuccess(Integer loginCode, Long uid);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 推动消息给所有在线的人
|
* 推动消息给所有在线的人
|
||||||
|
|||||||
@@ -1,10 +1,17 @@
|
|||||||
package com.abin.mallchat.custom.user.service;
|
package com.abin.mallchat.custom.user.service;
|
||||||
|
|
||||||
import cn.hutool.core.util.RandomUtil;
|
import cn.hutool.core.util.RandomUtil;
|
||||||
|
import com.abin.mallchat.common.common.constant.MQConstant;
|
||||||
|
import com.abin.mallchat.common.common.constant.RedisKey;
|
||||||
|
import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
|
||||||
|
import com.abin.mallchat.common.common.utils.CacheHolder;
|
||||||
|
import com.abin.mallchat.common.common.utils.RedisUtils;
|
||||||
import com.abin.mallchat.common.user.dao.UserDao;
|
import com.abin.mallchat.common.user.dao.UserDao;
|
||||||
import com.abin.mallchat.common.user.domain.entity.User;
|
import com.abin.mallchat.common.user.domain.entity.User;
|
||||||
import com.abin.mallchat.custom.user.service.adapter.TextBuilder;
|
import com.abin.mallchat.custom.user.service.adapter.TextBuilder;
|
||||||
import com.abin.mallchat.custom.user.service.adapter.UserAdapter;
|
import com.abin.mallchat.custom.user.service.adapter.UserAdapter;
|
||||||
|
import com.abin.mallchat.transaction.service.MQProducer;
|
||||||
|
import io.netty.channel.Channel;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
|
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
|
||||||
import me.chanjar.weixin.mp.api.WxMpService;
|
import me.chanjar.weixin.mp.api.WxMpService;
|
||||||
@@ -21,6 +28,7 @@ import org.springframework.stereotype.Service;
|
|||||||
import java.net.URLEncoder;
|
import java.net.URLEncoder;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Description: 处理与微信api的交互逻辑
|
* Description: 处理与微信api的交互逻辑
|
||||||
@@ -48,25 +56,37 @@ public class WxMsgService {
|
|||||||
private UserService userService;
|
private UserService userService;
|
||||||
@Autowired
|
@Autowired
|
||||||
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
||||||
|
@Autowired
|
||||||
|
private MQProducer mqProducer;
|
||||||
|
|
||||||
public WxMpXmlOutMessage scan(WxMpService wxMpService, WxMpXmlMessage wxMpXmlMessage) {
|
public WxMpXmlOutMessage scan(WxMpService wxMpService, WxMpXmlMessage wxMpXmlMessage) {
|
||||||
String fromUser = wxMpXmlMessage.getFromUser();
|
String openid = wxMpXmlMessage.getFromUser();
|
||||||
Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage));
|
Integer loginCode = Integer.parseInt(getEventKey(wxMpXmlMessage));
|
||||||
User user = userDao.getByOpenId(fromUser);
|
User user = userDao.getByOpenId(openid);
|
||||||
|
//如果已经注册,直接登录成功
|
||||||
if (Objects.nonNull(user) && StringUtils.isNotEmpty(user.getAvatar())) {
|
if (Objects.nonNull(user) && StringUtils.isNotEmpty(user.getAvatar())) {
|
||||||
//注册且已经授权的用户,直接登录成功
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
||||||
login(user.getId(), eventKey);
|
//要么在本地登录,否则利用mq广播到到所有服务上尝试登录
|
||||||
|
if (Objects.nonNull(channel)) {
|
||||||
|
String token = loginService.login(user.getId());
|
||||||
|
webSocketService.scanLoginSuccess(loginCode, user, token);
|
||||||
|
}else {
|
||||||
|
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
|
||||||
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//user为空先注册,手动生成,以保存uid
|
||||||
if (Objects.isNull(user)) {
|
if (Objects.isNull(user)) {
|
||||||
//未注册的先注册
|
user = User.builder().openId(openid).build();
|
||||||
userService.register(fromUser);
|
userService.register(user);
|
||||||
}
|
}
|
||||||
//保存openid和场景code的关系,后续才能通知到前端
|
Long uid = user.getId();
|
||||||
OPENID_EVENT_CODE_MAP.put(fromUser, eventKey);
|
|
||||||
//授权流程,给用户发送授权消息,并且异步通知前端扫码成功
|
//在 redis中保存openid和场景code的关系,后续才能通知到前端,旧版数据没有清除,这里设置了过期时间
|
||||||
threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(eventKey));
|
RedisUtils.set(RedisKey.getKey(RedisKey.OPEN_ID_STRING, openid), loginCode, 60, TimeUnit.MINUTES);
|
||||||
|
//授权流程,给用户发送授权消息,并且异步通知前端扫码成功(如非本地channel,使用MQ通知某服务对前端进行通知扫码成功)
|
||||||
|
threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(loginCode, uid));
|
||||||
String skipUrl = String.format(URL, wxMpService.getWxMpConfigStorage().getAppId(), URLEncoder.encode(callback + "/wx/portal/public/callBack"));
|
String skipUrl = String.format(URL, wxMpService.getWxMpConfigStorage().getAppId(), URLEncoder.encode(callback + "/wx/portal/public/callBack"));
|
||||||
WxMpXmlOutMessage.TEXT().build();
|
WxMpXmlOutMessage.TEXT().build();
|
||||||
return new TextBuilder().build("请点击链接授权:<a href=\"" + skipUrl + "\">登录</a>", wxMpXmlMessage, wxMpService);
|
return new TextBuilder().build("请点击链接授权:<a href=\"" + skipUrl + "\">登录</a>", wxMpXmlMessage, wxMpService);
|
||||||
@@ -88,9 +108,20 @@ public class WxMsgService {
|
|||||||
if (StringUtils.isEmpty(user.getName())) {
|
if (StringUtils.isEmpty(user.getName())) {
|
||||||
fillUserInfo(user.getId(), userInfo);
|
fillUserInfo(user.getId(), userInfo);
|
||||||
}
|
}
|
||||||
//触发用户登录成功操作
|
//找到对应的
|
||||||
Integer eventKey = OPENID_EVENT_CODE_MAP.get(userInfo.getOpenid());
|
Integer eventKey = RedisUtils.get(RedisKey.getKey(RedisKey.OPEN_ID_STRING, userInfo.getOpenid()), Integer.class);
|
||||||
login(user.getId(), eventKey);
|
//如果channel就在本地,直接登录
|
||||||
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
|
||||||
|
if (Objects.nonNull(channel)) {
|
||||||
|
login(user.getId(), eventKey);
|
||||||
|
}else {
|
||||||
|
//如果channel不在本地,利用mq广播到到所有服务上,尝试进行登录
|
||||||
|
//手动生成一个WxMpXmlMessage
|
||||||
|
WxMpXmlMessage wxMpXmlMessage = new WxMpXmlMessage();
|
||||||
|
wxMpXmlMessage.setFromUser(userInfo.getOpenid());
|
||||||
|
wxMpXmlMessage.setEventKey("qrscene_"+eventKey);
|
||||||
|
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillUserInfo(Long uid, WxOAuth2UserInfo userInfo) {
|
private void fillUserInfo(Long uid, WxOAuth2UserInfo userInfo) {
|
||||||
@@ -108,7 +139,7 @@ public class WxMsgService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void login(Long uid, Integer eventKey) {
|
public void login(Long uid, Integer eventKey) {
|
||||||
User user = userDao.getById(uid);
|
User user = userDao.getById(uid);
|
||||||
//调用用户登录模块
|
//调用用户登录模块
|
||||||
String token = loginService.login(uid);
|
String token = loginService.login(uid);
|
||||||
|
|||||||
@@ -120,10 +120,9 @@ public class UserServiceImpl implements UserService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void register(String openId) {
|
public void register(User user) {
|
||||||
User insert = User.builder().openId(openId).build();
|
userDao.save(user);
|
||||||
userDao.save(insert);
|
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, user));
|
||||||
applicationEventPublisher.publishEvent(new UserRegisterEvent(this, insert));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -5,8 +5,13 @@ import cn.hutool.core.util.ObjectUtil;
|
|||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
import com.abin.mallchat.common.common.annotation.FrequencyControl;
|
import com.abin.mallchat.common.common.annotation.FrequencyControl;
|
||||||
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
|
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
|
||||||
|
import com.abin.mallchat.common.common.constant.MQConstant;
|
||||||
|
import com.abin.mallchat.common.common.constant.RedisKey;
|
||||||
|
import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
|
||||||
import com.abin.mallchat.common.common.event.UserOfflineEvent;
|
import com.abin.mallchat.common.common.event.UserOfflineEvent;
|
||||||
import com.abin.mallchat.common.common.event.UserOnlineEvent;
|
import com.abin.mallchat.common.common.event.UserOnlineEvent;
|
||||||
|
import com.abin.mallchat.common.common.utils.CacheHolder;
|
||||||
|
import com.abin.mallchat.common.common.utils.RedisUtils;
|
||||||
import com.abin.mallchat.common.user.dao.UserDao;
|
import com.abin.mallchat.common.user.dao.UserDao;
|
||||||
import com.abin.mallchat.common.user.domain.entity.User;
|
import com.abin.mallchat.common.user.domain.entity.User;
|
||||||
import com.abin.mallchat.common.user.domain.enums.RoleEnum;
|
import com.abin.mallchat.common.user.domain.enums.RoleEnum;
|
||||||
@@ -19,8 +24,7 @@ import com.abin.mallchat.custom.user.service.LoginService;
|
|||||||
import com.abin.mallchat.custom.user.service.WebSocketService;
|
import com.abin.mallchat.custom.user.service.WebSocketService;
|
||||||
import com.abin.mallchat.custom.user.service.adapter.WSAdapter;
|
import com.abin.mallchat.custom.user.service.adapter.WSAdapter;
|
||||||
import com.abin.mallchat.custom.user.websocket.NettyUtil;
|
import com.abin.mallchat.custom.user.websocket.NettyUtil;
|
||||||
import com.github.benmanes.caffeine.cache.Cache;
|
import com.abin.mallchat.transaction.service.MQProducer;
|
||||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
||||||
import io.netty.channel.Channel;
|
import io.netty.channel.Channel;
|
||||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
@@ -37,7 +41,7 @@ import java.time.Duration;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@@ -51,17 +55,7 @@ import java.util.concurrent.locks.ReentrantLock;
|
|||||||
public class WebSocketServiceImpl implements WebSocketService {
|
public class WebSocketServiceImpl implements WebSocketService {
|
||||||
|
|
||||||
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
|
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
|
||||||
private static final Long MAX_MUM_SIZE = 10000L;
|
|
||||||
|
|
||||||
private static final AtomicInteger CODE = new AtomicInteger();
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 所有请求登录的code与channel关系
|
|
||||||
*/
|
|
||||||
private static final Cache<Integer, Channel> WAIT_LOGIN_MAP = Caffeine.newBuilder()
|
|
||||||
.expireAfterWrite(EXPIRE_TIME)
|
|
||||||
.maximumSize(MAX_MUM_SIZE)
|
|
||||||
.build();
|
|
||||||
/**
|
/**
|
||||||
* 所有已连接的websocket连接列表和一些额外参数
|
* 所有已连接的websocket连接列表和一些额外参数
|
||||||
*/
|
*/
|
||||||
@@ -74,7 +68,10 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
public static ConcurrentHashMap<Channel, WSChannelExtraDTO> getOnlineMap() {
|
public static ConcurrentHashMap<Channel, WSChannelExtraDTO> getOnlineMap() {
|
||||||
return ONLINE_WS_MAP;
|
return ONLINE_WS_MAP;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* redis保存loginCode的key
|
||||||
|
*/
|
||||||
|
private static final String LOGIN_CODE = "loginCode";
|
||||||
@Autowired
|
@Autowired
|
||||||
private WxMpService wxMpService;
|
private WxMpService wxMpService;
|
||||||
@Autowired
|
@Autowired
|
||||||
@@ -90,6 +87,8 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
private UserCache userCache;
|
private UserCache userCache;
|
||||||
@Autowired
|
@Autowired
|
||||||
private IRoleService iRoleService;
|
private IRoleService iRoleService;
|
||||||
|
@Autowired
|
||||||
|
private MQProducer mqProducer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 处理用户登录请求,需要返回一张带code的二维码
|
* 处理用户登录请求,需要返回一张带code的二维码
|
||||||
@@ -100,11 +99,11 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
@Override
|
@Override
|
||||||
@FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
|
@FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
|
||||||
public void handleLoginReq(Channel channel) {
|
public void handleLoginReq(Channel channel) {
|
||||||
//生成随机不重复的登录码
|
//生成随机不重复的登录码,并将channel存在本地cache中
|
||||||
Integer code = generateLoginCode(channel);
|
Integer code = generateLoginCode(channel);
|
||||||
//请求微信接口,获取登录码地址
|
//请求微信接口,获取登录码地址
|
||||||
WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int) EXPIRE_TIME.getSeconds());
|
WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int) EXPIRE_TIME.getSeconds());
|
||||||
//返回给前端
|
//返回给前端(channel必在本地)
|
||||||
sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
|
sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,11 +114,14 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
private Integer generateLoginCode(Channel channel) {
|
private Integer generateLoginCode(Channel channel) {
|
||||||
|
int inc = 0;
|
||||||
do {
|
do {
|
||||||
CODE.getAndIncrement();
|
//本地cache时间必须比redis key过期时间短,否则会出现并发问题
|
||||||
} while (WAIT_LOGIN_MAP.asMap().containsKey(CODE.get())
|
inc = RedisUtils.integerInc(RedisKey.getKey(LOGIN_CODE), 61, TimeUnit.MINUTES);
|
||||||
|| Objects.isNull(WAIT_LOGIN_MAP.get(CODE.get(), c -> channel)));
|
} while (CacheHolder.WAIT_LOGIN_MAP.asMap().containsKey(inc));
|
||||||
return CODE.get();
|
//储存一份在本地
|
||||||
|
CacheHolder.WAIT_LOGIN_MAP.put(inc, channel);
|
||||||
|
return inc;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -160,13 +162,14 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 登录成功,并更新状态
|
* (channel必在本地)登录成功,并更新状态
|
||||||
*/
|
*/
|
||||||
private void loginSuccess(Channel channel, User user, String token) {
|
private void loginSuccess(Channel channel, User user, String token) {
|
||||||
//更新上线列表
|
//更新上线列表
|
||||||
online(channel, user.getId());
|
online(channel, user.getId());
|
||||||
//返回给用户登录成功
|
//返回给用户登录成功
|
||||||
boolean hasPower = iRoleService.hasPower(user.getId(), RoleEnum.CHAT_MANAGER);
|
boolean hasPower = iRoleService.hasPower(user.getId(), RoleEnum.CHAT_MANAGER);
|
||||||
|
//发送给对应的用户
|
||||||
sendMsg(channel, WSAdapter.buildLoginSuccessResp(user, token, hasPower));
|
sendMsg(channel, WSAdapter.buildLoginSuccessResp(user, token, hasPower));
|
||||||
//发送用户上线事件
|
//发送用户上线事件
|
||||||
boolean online = userCache.isOnline(user.getId());
|
boolean online = userCache.isOnline(user.getId());
|
||||||
@@ -206,25 +209,28 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
@Override
|
@Override
|
||||||
public Boolean scanLoginSuccess(Integer loginCode, User user, String token) {
|
public Boolean scanLoginSuccess(Integer loginCode, User user, String token) {
|
||||||
//发送消息
|
//发送消息
|
||||||
Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
||||||
if (Objects.isNull(channel)) {
|
if (Objects.isNull(channel)) {
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
}
|
}
|
||||||
//移除code
|
//移除code
|
||||||
WAIT_LOGIN_MAP.invalidate(loginCode);
|
CacheHolder.WAIT_LOGIN_MAP.invalidate(loginCode);
|
||||||
//用户登录
|
//用户登录
|
||||||
loginSuccess(channel, user, token);
|
loginSuccess(channel, user, token);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean scanSuccess(Integer loginCode) {
|
public Boolean scanSuccess(Integer loginCode, Long uid) {
|
||||||
Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
|
||||||
if (Objects.isNull(channel)) {
|
if (Objects.nonNull(channel)) {
|
||||||
|
sendMsg(channel, WSAdapter.buildScanSuccessResp());
|
||||||
|
return Boolean.TRUE;
|
||||||
|
}else {
|
||||||
|
//广播通知次channel服务扫码成功
|
||||||
|
mqProducer.sendMsg(MQConstant.SCAN_MSG_TOPIC, new ScanSuccessMessageDTO(loginCode, WSAdapter.buildScanSuccessResp()));
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
}
|
}
|
||||||
sendMsg(channel, WSAdapter.buildScanSuccessResp());
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -267,10 +273,15 @@ public class WebSocketServiceImpl implements WebSocketService {
|
|||||||
channels.forEach(channel -> {
|
channels.forEach(channel -> {
|
||||||
threadPoolTaskExecutor.execute(() -> sendMsg(channel, wsBaseResp));
|
threadPoolTaskExecutor.execute(() -> sendMsg(channel, wsBaseResp));
|
||||||
});
|
});
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 给本地channel发送消息
|
||||||
|
*
|
||||||
|
* @param channel
|
||||||
|
* @param wsBaseResp
|
||||||
|
*/
|
||||||
private void sendMsg(Channel channel, WSBaseResp<?> wsBaseResp) {
|
private void sendMsg(Channel channel, WSBaseResp<?> wsBaseResp) {
|
||||||
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
|
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user