feat:使用caffine进行存储,设置时间与lru淘汰策略

This commit is contained in:
wangchao
2023-07-03 21:44:35 +08:00
parent b994d0b962
commit 41660e189b

View File

@@ -20,6 +20,8 @@ import com.abin.mallchat.custom.user.service.LoginService;
import com.abin.mallchat.custom.user.service.WebSocketService; import com.abin.mallchat.custom.user.service.WebSocketService;
import com.abin.mallchat.custom.user.service.adapter.WSAdapter; import com.abin.mallchat.custom.user.service.adapter.WSAdapter;
import com.abin.mallchat.custom.user.websocket.NettyUtil; import com.abin.mallchat.custom.user.websocket.NettyUtil;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@@ -32,9 +34,11 @@ import org.springframework.context.ApplicationEventPublisher;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -47,12 +51,18 @@ import java.util.concurrent.locks.ReentrantLock;
@Slf4j @Slf4j
public class WebSocketServiceImpl implements WebSocketService { public class WebSocketServiceImpl implements WebSocketService {
private static final Duration EXPIRE_TIME = Duration.ofHours(1);
private static final Long MAX_MUM_SIZE = 10000L;
private static final AtomicInteger CODE = new AtomicInteger();
/** /**
* 所有请求登录的code与channel关系 * 所有请求登录的code与channel关系
* todo 有可能有人请求了二维码,就是不登录,留个坑,之后处理
*/ */
private static final ConcurrentHashMap<Integer, Channel> WAIT_LOGIN_MAP = new ConcurrentHashMap<>(); private static final Cache<Integer, Channel> WAIT_LOGIN_MAP = Caffeine.newBuilder()
.expireAfterWrite(EXPIRE_TIME)
.maximumSize(MAX_MUM_SIZE)
.build();
/** /**
* 所有已连接的websocket连接列表和一些额外参数 * 所有已连接的websocket连接列表和一些额外参数
*/ */
@@ -66,7 +76,6 @@ public class WebSocketServiceImpl implements WebSocketService {
return ONLINE_WS_MAP; return ONLINE_WS_MAP;
} }
public static final int EXPIRE_SECONDS = 60 * 60;
@Autowired @Autowired
private WxMpService wxMpService; private WxMpService wxMpService;
@Autowired @Autowired
@@ -95,7 +104,7 @@ public class WebSocketServiceImpl implements WebSocketService {
//生成随机不重复的登录码 //生成随机不重复的登录码
Integer code = generateLoginCode(channel); Integer code = generateLoginCode(channel);
//请求微信接口,获取登录码地址 //请求微信接口,获取登录码地址
WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, EXPIRE_SECONDS); WxMpQrCodeTicket wxMpQrCodeTicket = wxMpService.getQrcodeService().qrCodeCreateTmpTicket(code, (int) EXPIRE_TIME.getSeconds());
//返回给前端 //返回给前端
sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket)); sendMsg(channel, WSAdapter.buildLoginResp(wxMpQrCodeTicket));
} }
@@ -107,12 +116,11 @@ public class WebSocketServiceImpl implements WebSocketService {
* @return * @return
*/ */
private Integer generateLoginCode(Channel channel) { private Integer generateLoginCode(Channel channel) {
int code;
do { do {
code = RandomUtil.randomInt(Integer.MAX_VALUE); CODE.set(RandomUtil.randomInt(Integer.MAX_VALUE));
} while (WAIT_LOGIN_MAP.contains(code) } while (WAIT_LOGIN_MAP.asMap().containsKey(CODE.get())
|| Objects.nonNull(WAIT_LOGIN_MAP.putIfAbsent(code, channel))); || Objects.isNull(WAIT_LOGIN_MAP.get(CODE.get(), c -> channel)));
return code; return CODE.get();
} }
/** /**
@@ -199,12 +207,12 @@ public class WebSocketServiceImpl implements WebSocketService {
@Override @Override
public Boolean scanLoginSuccess(Integer loginCode, User user, String token) { public Boolean scanLoginSuccess(Integer loginCode, User user, String token) {
//发送消息 //发送消息
Channel channel = WAIT_LOGIN_MAP.get(loginCode); Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.isNull(channel)) { if (Objects.isNull(channel)) {
return Boolean.FALSE; return Boolean.FALSE;
} }
//移除code //移除code
WAIT_LOGIN_MAP.remove(loginCode); WAIT_LOGIN_MAP.invalidate(loginCode);
//用户登录 //用户登录
loginSuccess(channel, user, token); loginSuccess(channel, user, token);
return true; return true;
@@ -212,7 +220,7 @@ public class WebSocketServiceImpl implements WebSocketService {
@Override @Override
public Boolean scanSuccess(Integer loginCode) { public Boolean scanSuccess(Integer loginCode) {
Channel channel = WAIT_LOGIN_MAP.get(loginCode); Channel channel = WAIT_LOGIN_MAP.getIfPresent(loginCode);
if (Objects.isNull(channel)) { if (Objects.isNull(channel)) {
return Boolean.FALSE; return Boolean.FALSE;
} }
@@ -287,4 +295,6 @@ public class WebSocketServiceImpl implements WebSocketService {
reentrantLock.unlock(); reentrantLock.unlock();
Thread.sleep(1000); Thread.sleep(1000);
} }
} }