1.登录集群改造
2.netty取消上下文
This commit is contained in:
zhongzb
2023-08-20 14:37:01 +08:00
parent 7dad76d829
commit 5c0ec34760
15 changed files with 197 additions and 272 deletions

View File

@@ -1,23 +1,14 @@
package com.abin.mallchat.custom.user.consumer;
import com.abin.mallchat.common.common.constant.MQConstant;
import com.abin.mallchat.common.common.constant.RedisKey;
import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
import com.abin.mallchat.common.common.utils.CacheHolder;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.custom.user.service.WebSocketService;
import com.abin.mallchat.custom.user.service.WxMsgService;
import io.netty.channel.Channel;
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* Description: 在本地服务上找寻对应channel将对应用户登陆并触发所有用户收到上线事件
* Author: <a href="https://github.com/zongzibinbin">abin</a>
@@ -27,30 +18,12 @@ import java.util.Objects;
@Component
public class MsgLoginConsumer implements RocketMQListener<LoginMessageDTO> {
@Autowired
private WxMsgService wxMsgService;
@Autowired
private UserDao userDao;
private WebSocketService webSocketService;
@Override
public void onMessage(LoginMessageDTO loginMessageDTO) {
WxMpXmlMessage wxMpXmlMessage = loginMessageDTO.getWxMpXmlMessage();
//给二维码绑定的登录code
Integer eventKey = Integer.parseInt(getEventKey(wxMpXmlMessage));
//本地未储存对应的channel,则结束
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
if (Objects.isNull(channel)) {
return;
}
//查询openid对应的用户(必然存在)
String openid = wxMpXmlMessage.getFromUser();
User user = userDao.getByOpenId(openid);
//登录,并且清除缓存
wxMsgService.login(user.getId(), eventKey);
}
private String getEventKey(WxMpXmlMessage wxMpXmlMessage) {
//扫码关注的渠道事件有前缀,需要去除
return wxMpXmlMessage.getEventKey().replace("qrscene_", "");
//尝试登录登录
webSocketService.scanLoginSuccess(loginMessageDTO.getCode(), loginMessageDTO.getUid());
}
}

View File

@@ -1,24 +1,14 @@
package com.abin.mallchat.custom.user.consumer;
import cn.hutool.json.JSONUtil;
import com.abin.mallchat.common.common.constant.MQConstant;
import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
import com.abin.mallchat.common.common.utils.CacheHolder;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.custom.user.service.WebSocketService;
import com.abin.mallchat.custom.user.service.WxMsgService;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* Description: 将扫码成功的信息发送给对应的用户,等待授权
* Author: <a href="https://github.com/zongzibinbin">abin</a>
@@ -29,22 +19,10 @@ import java.util.Objects;
public class ScanSuccessConsumer implements RocketMQListener<ScanSuccessMessageDTO> {
@Autowired
private WebSocketService webSocketService;
@Autowired
private WxMsgService wxMsgService;
@Autowired
private UserDao userDao;
@Override
public void onMessage(ScanSuccessMessageDTO scanSuccessMessageDTO) {
Integer loginCode = scanSuccessMessageDTO.getLoginCode();
//本地未储存对应的channel,则结束
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.isNull(channel)) {
return;
}
//给正在等待登陆的channel发送扫码成功的消息等待授权
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(scanSuccessMessageDTO.getWsBaseMsg())));
webSocketService.scanSuccess(scanSuccessMessageDTO.getCode());
}
}

View File

@@ -5,7 +5,6 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
import me.chanjar.weixin.common.bean.oauth2.WxOAuth2AccessToken;
import me.chanjar.weixin.common.error.WxErrorException;
import me.chanjar.weixin.mp.api.WxMpMessageRouter;
import me.chanjar.weixin.mp.api.WxMpService;
import me.chanjar.weixin.mp.bean.message.WxMpXmlMessage;
@@ -50,7 +49,7 @@ public class WxPortalController {
}
@GetMapping("/callBack")
public RedirectView callBack(@RequestParam String code) throws WxErrorException {
public RedirectView callBack(@RequestParam String code) {
try {
WxOAuth2AccessToken accessToken = wxService.getOAuth2Service().getAccessToken(code);
WxOAuth2UserInfo userInfo = wxService.getOAuth2Service().getUserInfo(accessToken, "zh_CN");

View File

@@ -1,6 +1,5 @@
package com.abin.mallchat.custom.user.service;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.common.user.domain.enums.WSBaseResp;
import com.abin.mallchat.common.user.domain.vo.request.ws.WSAuthorize;
import io.netty.channel.Channel;
@@ -37,19 +36,15 @@ public interface WebSocketService {
/**
* 扫码用户登录成功通知,清除本地Cache中的loginCode和channel的关系
*
* @param loginCode
* @param user
* @param token
*/
Boolean scanLoginSuccess(Integer loginCode, User user, String token);
Boolean scanLoginSuccess(Integer loginCode, Long uid);
/**
* 通知用户扫码成功
*
* @param loginCode
*/
Boolean scanSuccess(Integer loginCode, Long uid);
Boolean scanSuccess(Integer loginCode);
/**
* 推动消息给所有在线的人

View File

@@ -4,14 +4,13 @@ import cn.hutool.core.util.RandomUtil;
import com.abin.mallchat.common.common.constant.MQConstant;
import com.abin.mallchat.common.common.constant.RedisKey;
import com.abin.mallchat.common.common.domain.dto.LoginMessageDTO;
import com.abin.mallchat.common.common.utils.CacheHolder;
import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
import com.abin.mallchat.common.common.utils.RedisUtils;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
import com.abin.mallchat.custom.user.service.adapter.TextBuilder;
import com.abin.mallchat.custom.user.service.adapter.UserAdapter;
import com.abin.mallchat.transaction.service.MQProducer;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import me.chanjar.weixin.common.bean.WxOAuth2UserInfo;
import me.chanjar.weixin.mp.api.WxMpService;
@@ -20,14 +19,11 @@ import me.chanjar.weixin.mp.bean.message.WxMpXmlOutMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.net.URLEncoder;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
@@ -41,22 +37,14 @@ public class WxMsgService {
/**
* 用户的openId和前端登录场景code的映射关系
*/
private static final ConcurrentHashMap<String, Integer> OPENID_EVENT_CODE_MAP = new ConcurrentHashMap<>();
private static final String URL = "https://open.weixin.qq.com/connect/oauth2/authorize?appid=%s&redirect_uri=%s&response_type=code&scope=snsapi_userinfo&state=STATE#wechat_redirect";
@Value("${wx.mp.callback}")
private String callback;
@Autowired
private UserDao userDao;
@Autowired
@Lazy
private WebSocketService webSocketService;
@Autowired
private LoginService loginService;
@Autowired
private UserService userService;
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private MQProducer mqProducer;
public WxMpXmlOutMessage scan(WxMpService wxMpService, WxMpXmlMessage wxMpXmlMessage) {
@@ -65,14 +53,7 @@ public class WxMsgService {
User user = userDao.getByOpenId(openid);
//如果已经注册,直接登录成功
if (Objects.nonNull(user) && StringUtils.isNotEmpty(user.getAvatar())) {
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
//要么在本地登录,否则利用mq广播到到所有服务上尝试登录
if (Objects.nonNull(channel)) {
String token = loginService.login(user.getId());
webSocketService.scanLoginSuccess(loginCode, user, token);
}else {
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
}
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(user.getId(), loginCode));
return null;
}
@@ -81,12 +62,10 @@ public class WxMsgService {
user = User.builder().openId(openid).build();
userService.register(user);
}
Long uid = user.getId();
//在 redis中保存openid和场景code的关系后续才能通知到前端,旧版数据没有清除,这里设置了过期时间
//在redis中保存openid和场景code的关系后续才能通知到前端,旧版数据没有清除,这里设置了过期时间
RedisUtils.set(RedisKey.getKey(RedisKey.OPEN_ID_STRING, openid), loginCode, 60, TimeUnit.MINUTES);
//授权流程,给用户发送授权消息,并且异步通知前端扫码成功(如非本地channel,使用MQ通知某服务对前端进行通知扫码成功)
threadPoolTaskExecutor.execute(() -> webSocketService.scanSuccess(loginCode, uid));
//授权流程,给用户发送授权消息,并且异步通知前端扫码成功,等待授权
mqProducer.sendMsg(MQConstant.SCAN_MSG_TOPIC, new ScanSuccessMessageDTO(loginCode));
String skipUrl = String.format(URL, wxMpService.getWxMpConfigStorage().getAppId(), URLEncoder.encode(callback + "/wx/portal/public/callBack"));
WxMpXmlOutMessage.TEXT().build();
return new TextBuilder().build("请点击链接授权:<a href=\"" + skipUrl + "\">登录</a>", wxMpXmlMessage, wxMpService);
@@ -108,20 +87,10 @@ public class WxMsgService {
if (StringUtils.isEmpty(user.getName())) {
fillUserInfo(user.getId(), userInfo);
}
//找到对应的
Integer eventKey = RedisUtils.get(RedisKey.getKey(RedisKey.OPEN_ID_STRING, userInfo.getOpenid()), Integer.class);
//如果channel就在本地直接登录
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(eventKey);
if (Objects.nonNull(channel)) {
login(user.getId(), eventKey);
}else {
//如果channel不在本地利用mq广播到到所有服务上,尝试进行登录
//手动生成一个WxMpXmlMessage
WxMpXmlMessage wxMpXmlMessage = new WxMpXmlMessage();
wxMpXmlMessage.setFromUser(userInfo.getOpenid());
wxMpXmlMessage.setEventKey("qrscene_"+eventKey);
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(wxMpXmlMessage));
}
//找到对应的code
Integer code = RedisUtils.get(RedisKey.getKey(RedisKey.OPEN_ID_STRING, userInfo.getOpenid()), Integer.class);
//发送登录成功事件
mqProducer.sendMsg(MQConstant.LOGIN_MSG_TOPIC, new LoginMessageDTO(user.getId(), code));
}
private void fillUserInfo(Long uid, WxOAuth2UserInfo userInfo) {
@@ -138,12 +107,4 @@ public class WxMsgService {
update.setName("名字重置" + RandomUtil.randomInt(100000));
}
}
public void login(Long uid, Integer eventKey) {
User user = userDao.getById(uid);
//调用用户登录模块
String token = loginService.login(uid);
//推送前端登录成功
webSocketService.scanLoginSuccess(eventKey, user, token);
}
}

View File

@@ -5,12 +5,9 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONUtil;
import com.abin.mallchat.common.common.annotation.FrequencyControl;
import com.abin.mallchat.common.common.config.ThreadPoolConfig;
import com.abin.mallchat.common.common.constant.MQConstant;
import com.abin.mallchat.common.common.constant.RedisKey;
import com.abin.mallchat.common.common.domain.dto.ScanSuccessMessageDTO;
import com.abin.mallchat.common.common.event.UserOfflineEvent;
import com.abin.mallchat.common.common.event.UserOnlineEvent;
import com.abin.mallchat.common.common.utils.CacheHolder;
import com.abin.mallchat.common.common.utils.RedisUtils;
import com.abin.mallchat.common.user.dao.UserDao;
import com.abin.mallchat.common.user.domain.entity.User;
@@ -25,6 +22,8 @@ import com.abin.mallchat.custom.user.service.WebSocketService;
import com.abin.mallchat.custom.user.service.adapter.WSAdapter;
import com.abin.mallchat.custom.user.websocket.NettyUtil;
import com.abin.mallchat.transaction.service.MQProducer;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.SneakyThrows;
@@ -38,12 +37,12 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*;
import java.util.Date;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Description: websocket处理类
@@ -55,7 +54,14 @@ import java.util.concurrent.locks.ReentrantLock;
public class WebSocketServiceImpl implements WebSocketService {
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
private static final Long MAX_MUM_SIZE = 10000L;
/**
* 所有请求登录的code与channel关系
*/
public static final Cache<Integer, Channel> WAIT_LOGIN_MAP = Caffeine.newBuilder()
.expireAfterWrite(EXPIRE_TIME)
.maximumSize(MAX_MUM_SIZE)
.build();
/**
* 所有已连接的websocket连接列表和一些额外参数
*/
@@ -68,6 +74,7 @@ public class WebSocketServiceImpl implements WebSocketService {
public static ConcurrentHashMap<Channel, WSChannelExtraDTO> getOnlineMap() {
return ONLINE_WS_MAP;
}
/**
* redis保存loginCode的key
*/
@@ -97,7 +104,7 @@ public class WebSocketServiceImpl implements WebSocketService {
*/
@SneakyThrows
@Override
@FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
@FrequencyControl(time = 1000, count = 50, spEl = "T(com.abin.mallchat.custom.user.websocket.NettyUtil).getAttr(#channel,T(com.abin.mallchat.custom.user.websocket.NettyUtil).IP)")
public void handleLoginReq(Channel channel) {
//生成随机不重复的登录码,并将channel存在本地cache中
Integer code = generateLoginCode(channel);
@@ -114,13 +121,13 @@ public class WebSocketServiceImpl implements WebSocketService {
* @return
*/
private Integer generateLoginCode(Channel channel) {
int inc = 0;
int inc;
do {
//本地cache时间必须比redis key过期时间短否则会出现并发问题
inc = RedisUtils.integerInc(RedisKey.getKey(LOGIN_CODE), 61, TimeUnit.MINUTES);
} while (CacheHolder.WAIT_LOGIN_MAP.asMap().containsKey(inc));
inc = RedisUtils.integerInc(RedisKey.getKey(LOGIN_CODE), (int) EXPIRE_TIME.toMinutes(), TimeUnit.MINUTES);
} while (WAIT_LOGIN_MAP.asMap().containsKey(inc));
//储存一份在本地
CacheHolder.WAIT_LOGIN_MAP.put(inc, channel);
WAIT_LOGIN_MAP.put(inc, channel);
return inc;
}
@@ -130,7 +137,6 @@ public class WebSocketServiceImpl implements WebSocketService {
* @param channel
*/
@Override
// @FrequencyControl(time = 10, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
public void connect(Channel channel) {
ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO());
}
@@ -207,30 +213,30 @@ public class WebSocketServiceImpl implements WebSocketService {
}
@Override
public Boolean scanLoginSuccess(Integer loginCode, User user, String token) {
//发送消息
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
public Boolean scanLoginSuccess(Integer loginCode, Long uid) {
//确认连接在该机器
Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.isNull(channel)) {
return Boolean.FALSE;
}
User user = userDao.getById(uid);
//移除code
CacheHolder.WAIT_LOGIN_MAP.invalidate(loginCode);
WAIT_LOGIN_MAP.invalidate(loginCode);
//调用用户登录模块
String token = loginService.login(uid);
//用户登录
loginSuccess(channel, user, token);
return true;
return Boolean.TRUE;
}
@Override
public Boolean scanSuccess(Integer loginCode, Long uid) {
Channel channel = CacheHolder.WAIT_LOGIN_MAP.getIfPresent(loginCode);
public Boolean scanSuccess(Integer loginCode) {
Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.nonNull(channel)) {
sendMsg(channel, WSAdapter.buildScanSuccessResp());
return Boolean.TRUE;
}else {
//广播通知次channel服务扫码成功
mqProducer.sendMsg(MQConstant.SCAN_MSG_TOPIC, new ScanSuccessMessageDTO(loginCode, WSAdapter.buildScanSuccessResp()));
return Boolean.FALSE;
}
return Boolean.FALSE;
}
@@ -277,7 +283,7 @@ public class WebSocketServiceImpl implements WebSocketService {
/**
* 给本地channel发送消息
* 给本地channel发送消息
*
* @param channel
* @param wsBaseResp
@@ -286,39 +292,4 @@ public class WebSocketServiceImpl implements WebSocketService {
channel.writeAndFlush(new TextWebSocketFrame(JSONUtil.toJsonStr(wsBaseResp)));
}
/**
* 案例证明ConcurrentHashMap#entrySet的值不是快照数据
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
ConcurrentHashMap<Integer, Integer> a = new ConcurrentHashMap<>();
a.put(1, 1);
a.put(2, 2);
new Thread(() -> {
reentrantLock.lock();
Set<Map.Entry<Integer, Integer>> entries = a.entrySet();
System.out.println(entries);
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(entries);
reentrantLock.unlock();
}).start();
Thread.sleep(1000);
reentrantLock.lock();
a.put(3, 3);
System.out.println("haha");
condition.signalAll();
reentrantLock.unlock();
Thread.sleep(1000);
}
}

View File

@@ -1,27 +0,0 @@
package com.abin.mallchat.custom.user.websocket;
import com.abin.mallchat.common.common.constant.MDCKey;
import com.abin.mallchat.common.common.domain.dto.RequestInfo;
import com.abin.mallchat.common.common.utils.RequestHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.UUID;
@Slf4j
public class NettyCollectorHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String tid = UUID.randomUUID().toString();
MDC.put(MDCKey.TID, tid);
RequestInfo info = new RequestInfo();
info.setUid(NettyUtil.getAttr(ctx.channel(), NettyUtil.UID));
info.setIp(NettyUtil.getAttr(ctx.channel(), NettyUtil.IP));
RequestHolder.set(info);
ctx.fireChannelRead(msg);
}
}

View File

@@ -80,7 +80,6 @@ public class NettyWebSocketServer {
pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip
pipeline.addLast(new HttpHeadersHandler());
pipeline.addLast(new NettyCollectorHandler());
/**
* 说明:
* 1. 对于 WebSocket它的数据是以帧frame 的形式传递的;

View File

@@ -100,10 +100,6 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
break;
case HEARTBEAT:
break;
case AUTHORIZE:
this.webSocketService.authorize(ctx.channel(), JSONUtil.toBean(wsBaseReq.getData(), WSAuthorize.class));
log.info("主动认证 = " + msg.text());
break;
default:
log.info("未知类型");
}