websocket频控

This commit is contained in:
zhongzb
2023-06-07 21:19:37 +08:00
parent 8faed2abc9
commit 635144670c
5 changed files with 39 additions and 2 deletions

View File

@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.RandomUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.abin.mallchat.common.common.annotation.FrequencyControl;
import com.abin.mallchat.common.common.config.ThreadPoolConfig; import com.abin.mallchat.common.common.config.ThreadPoolConfig;
import com.abin.mallchat.common.common.event.UserOfflineEvent; import com.abin.mallchat.common.common.event.UserOfflineEvent;
import com.abin.mallchat.common.common.event.UserOnlineEvent; import com.abin.mallchat.common.common.event.UserOnlineEvent;
@@ -89,6 +90,8 @@ public class WebSocketServiceImpl implements WebSocketService {
*/ */
@SneakyThrows @SneakyThrows
@Override @Override
@FrequencyControl(time = 10, count = 2, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
@FrequencyControl(time = 100, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
public void handleLoginReq(Channel channel) { public void handleLoginReq(Channel channel) {
//生成随机不重复的登录码 //生成随机不重复的登录码
Integer code = generateLoginCode(channel); Integer code = generateLoginCode(channel);
@@ -119,6 +122,7 @@ public class WebSocketServiceImpl implements WebSocketService {
* @param channel * @param channel
*/ */
@Override @Override
@FrequencyControl(time = 10, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()")
public void connect(Channel channel) { public void connect(Channel channel) {
ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO()); ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO());
} }
@@ -174,6 +178,7 @@ public class WebSocketServiceImpl implements WebSocketService {
getOrInitChannelExt(channel).setUid(uid); getOrInitChannelExt(channel).setUid(uid);
ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>()); ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>());
ONLINE_UID_MAP.get(uid).add(channel); ONLINE_UID_MAP.get(uid).add(channel);
NettyUtil.setAttr(channel, NettyUtil.UID, uid);
} }
/** /**

View File

@@ -0,0 +1,27 @@
package com.abin.mallchat.custom.user.websocket;
import com.abin.mallchat.common.common.constant.MDCKey;
import com.abin.mallchat.common.common.domain.dto.RequestInfo;
import com.abin.mallchat.common.common.utils.RequestHolder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import java.util.UUID;
@Slf4j
public class NettyCollectorHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String tid = UUID.randomUUID().toString();
MDC.put(MDCKey.TID, tid);
RequestInfo info = new RequestInfo();
info.setUid(NettyUtil.getAttr(ctx.channel(), NettyUtil.UID));
info.setIp(NettyUtil.getAttr(ctx.channel(), NettyUtil.IP));
RequestHolder.set(info);
ctx.fireChannelRead(msg);
}
}

View File

@@ -13,6 +13,7 @@ import io.netty.util.AttributeKey;
public class NettyUtil { public class NettyUtil {
public static AttributeKey<String> IP = AttributeKey.valueOf("ip"); public static AttributeKey<String> IP = AttributeKey.valueOf("ip");
public static AttributeKey<Long> UID = AttributeKey.valueOf("uid");
public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) { public static <T> void setAttr(Channel channel, AttributeKey<T> attributeKey, T data) {
Attribute<T> attr = channel.attr(attributeKey); Attribute<T> attr = channel.attr(attributeKey);

View File

@@ -79,6 +79,7 @@ public class NettyWebSocketServer {
pipeline.addLast(new HttpObjectAggregator(8192)); pipeline.addLast(new HttpObjectAggregator(8192));
//保存用户ip //保存用户ip
pipeline.addLast(new HttpHeadersHandler()); pipeline.addLast(new HttpHeadersHandler());
pipeline.addLast(new NettyCollectorHandler());
/** /**
* 说明: * 说明:
* 1. 对于 WebSocket它的数据是以帧frame 的形式传递的; * 1. 对于 WebSocket它的数据是以帧frame 的形式传递的;

View File

@@ -9,6 +9,7 @@ import com.abin.mallchat.custom.user.service.WebSocketService;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -20,7 +21,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
// 当web客户端连接后触发该方法 // 当web客户端连接后触发该方法
@Override @Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
getService().connect(ctx.channel()); // getService().connect(ctx.channel());
} }
// 客户端离线 // 客户端离线
@@ -63,6 +64,8 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
// 关闭用户的连接 // 关闭用户的连接
userOffLine(ctx); userOffLine(ctx);
} }
} else if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) {
getService().connect(ctx.channel());
} }
super.userEventTriggered(ctx, evt); super.userEventTriggered(ctx, evt);
} }
@@ -70,7 +73,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<Tex
// 处理异常 // 处理异常
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("异常发生,异常消息 ={}", cause.getMessage()); log.warn("异常发生,异常消息 ={}", cause);
ctx.channel().close(); ctx.channel().close();
} }