From 635144670c5001a33a2b14c0ac9c1dad3d1b810e Mon Sep 17 00:00:00 2001 From: zhongzb <972627721@qq.com> Date: Wed, 7 Jun 2023 21:19:37 +0800 Subject: [PATCH] =?UTF-8?q?websocket=E9=A2=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/WebSocketServiceImpl.java | 5 ++++ .../user/websocket/NettyCollectorHandler.java | 27 +++++++++++++++++++ .../custom/user/websocket/NettyUtil.java | 1 + .../user/websocket/NettyWebSocketServer.java | 1 + .../NettyWebSocketServerHandler.java | 7 +++-- 5 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyCollectorHandler.java diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index 20d5aaf..e994bf0 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.RandomUtil; import cn.hutool.json.JSONUtil; +import com.abin.mallchat.common.common.annotation.FrequencyControl; import com.abin.mallchat.common.common.config.ThreadPoolConfig; import com.abin.mallchat.common.common.event.UserOfflineEvent; import com.abin.mallchat.common.common.event.UserOnlineEvent; @@ -89,6 +90,8 @@ public class WebSocketServiceImpl implements WebSocketService { */ @SneakyThrows @Override + @FrequencyControl(time = 10, count = 2, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()") + @FrequencyControl(time = 100, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()") public void handleLoginReq(Channel channel) { //生成随机不重复的登录码 Integer code = generateLoginCode(channel); @@ -119,6 +122,7 @@ public class WebSocketServiceImpl implements WebSocketService { * @param channel */ @Override + @FrequencyControl(time = 10, count = 5, spEl = "T(com.abin.mallchat.common.common.utils.RequestHolder).get().getIp()") public void connect(Channel channel) { ONLINE_WS_MAP.put(channel, new WSChannelExtraDTO()); } @@ -174,6 +178,7 @@ public class WebSocketServiceImpl implements WebSocketService { getOrInitChannelExt(channel).setUid(uid); ONLINE_UID_MAP.putIfAbsent(uid, new CopyOnWriteArrayList<>()); ONLINE_UID_MAP.get(uid).add(channel); + NettyUtil.setAttr(channel, NettyUtil.UID, uid); } /** diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyCollectorHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyCollectorHandler.java new file mode 100644 index 0000000..902bab7 --- /dev/null +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyCollectorHandler.java @@ -0,0 +1,27 @@ +package com.abin.mallchat.custom.user.websocket; + +import com.abin.mallchat.common.common.constant.MDCKey; +import com.abin.mallchat.common.common.domain.dto.RequestInfo; +import com.abin.mallchat.common.common.utils.RequestHolder; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; + +import java.util.UUID; + + +@Slf4j +public class NettyCollectorHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + String tid = UUID.randomUUID().toString(); + MDC.put(MDCKey.TID, tid); + RequestInfo info = new RequestInfo(); + info.setUid(NettyUtil.getAttr(ctx.channel(), NettyUtil.UID)); + info.setIp(NettyUtil.getAttr(ctx.channel(), NettyUtil.IP)); + RequestHolder.set(info); + + ctx.fireChannelRead(msg); + } +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java index 2d6a9d0..d2bacef 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java @@ -13,6 +13,7 @@ import io.netty.util.AttributeKey; public class NettyUtil { public static AttributeKey IP = AttributeKey.valueOf("ip"); + public static AttributeKey UID = AttributeKey.valueOf("uid"); public static void setAttr(Channel channel, AttributeKey attributeKey, T data) { Attribute attr = channel.attr(attributeKey); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java index c639dbf..fb9d912 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java @@ -79,6 +79,7 @@ public class NettyWebSocketServer { pipeline.addLast(new HttpObjectAggregator(8192)); //保存用户ip pipeline.addLast(new HttpHeadersHandler()); + pipeline.addLast(new NettyCollectorHandler()); /** * 说明: * 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的; diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java index d7639b8..be8c679 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java @@ -9,6 +9,7 @@ import com.abin.mallchat.custom.user.service.WebSocketService; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; @@ -20,7 +21,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler