From 6cf7379ac4d420464dea038f16adc9191a8f7004 Mon Sep 17 00:00:00 2001 From: zbzbzzz Date: Mon, 3 Jul 2023 18:06:04 +0800 Subject: [PATCH 1/5] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96ws=E6=8F=A1=E6=89=8B?= =?UTF-8?q?=E5=90=8C=E6=97=B6=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/algorithm/ac/CreateTokenTest.java | 11 +- .../service/impl/WebSocketServiceImpl.java | 4 + .../custom/user/websocket/NettyUtil.java | 2 + .../user/websocket/NettyWebSocketServer.java | 4 +- .../NettyWebSocketServerHandler.java | 19 +- .../NettyWebSocketServerProtocolHandler.java | 61 +++++++ .../websocket/WebSocketHandshakeHandler.java | 165 +++++++++++++++--- 7 files changed, 223 insertions(+), 43 deletions(-) create mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java diff --git a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java index 50908e4..34d4035 100644 --- a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java +++ b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java @@ -6,7 +6,6 @@ import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; import com.auth0.jwt.interfaces.JWTVerifier; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.junit.Test; import java.util.Date; @@ -18,7 +17,7 @@ public class CreateTokenTest { @Test public void create(){ String token = JWT.create() - .withClaim("uid", 123L) // 只存一个uid信息,其他的自己去redis查 + .withClaim("uid", 10004L) // 只存一个uid信息,其他的自己去redis查 .withClaim("createTime", new Date()) .sign(Algorithm.HMAC256("dsfsdfsdfsdfsd")); // signature log.info("生成的token为 {}",token); @@ -33,12 +32,4 @@ public class CreateTokenTest { } } - @Test - public void verifyToken(){ - String token = JWT.create() - .withClaim("uid", 1) // 只存一个uid信息,其他的自己去redis查 - .withClaim("createTime", new Date()) - .sign(Algorithm.HMAC256("dsfsdfsdfsdfsd")); // signature - log.info("生成的token为{}",token); - } } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index 7282473..96d0f68 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -29,6 +29,7 @@ import me.chanjar.weixin.mp.bean.result.WxMpQrCodeTicket; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -140,7 +141,10 @@ public class WebSocketServiceImpl implements WebSocketService { } } + // 这里可以加异步或者事件去处理登录 + // 因为登录和推送消息其实是不用同步的两个步骤 这样可以加快连接的速度不占用nio线程 @Override + @Async public void authorize(Channel channel, WSAuthorize wsAuthorize) { //校验token boolean verifySuccess = loginService.verify(wsAuthorize.getToken()); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java index 79daafa..bcc5b05 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java @@ -1,6 +1,7 @@ package com.abin.mallchat.custom.user.websocket; import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -15,6 +16,7 @@ public class NettyUtil { public static AttributeKey TOKEN = AttributeKey.valueOf("token"); public static AttributeKey IP = AttributeKey.valueOf("ip"); public static AttributeKey UID = AttributeKey.valueOf("uid"); + public static AttributeKey HANDSHAKER_ATTR_KEY = AttributeKey.valueOf(WebSocketServerHandshaker.class, "HANDSHAKER"); public static void setAttr(Channel channel, AttributeKey attributeKey, T data) { Attribute attr = channel.attr(attributeKey); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java index b70b867..b84bc06 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java @@ -10,6 +10,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -88,7 +89,8 @@ public class NettyWebSocketServer { * 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接; * 是通过一个状态码 101 来切换的 */ - pipeline.addLast(new WebSocketHandshakeHandler()); + pipeline.addLast(new NettyWebSocketServerProtocolHandler("/")); + new WebSocketServerProtocolHandler("/"); // 自定义handler ,处理业务逻辑 pipeline.addLast(new NettyWebSocketServerHandler()); } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java index 3d75069..b4fbb0a 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java @@ -1,6 +1,5 @@ package com.abin.mallchat.custom.user.websocket; -import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.abin.mallchat.custom.user.domain.enums.WSReqTypeEnum; @@ -19,10 +18,11 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler { + private WebSocketService webSocketService; // 当web客户端连接后,触发该方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { -// getService().connect(ctx.channel()); + this.webSocketService = getService(); } // 客户端离线 @@ -45,7 +45,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler out) throws Exception { + if (this.webSocketServerProtocolConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) { + WebSocketServerHandshaker handshaker = NettyUtil.getAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY); + if (handshaker != null) { + frame.retain(); + ChannelPromise promise = ctx.newPromise(); + Method closeSent = ReflectUtil.getMethod(super.getClass(), "closeSent", ChannelPromise.class); + closeSent.setAccessible(true); + closeSent.invoke(this,promise); + handshaker.close(ctx, (CloseWebSocketFrame)frame, promise); + } else { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } else { + super.decode(ctx, frame, out); + } + } +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java index f501602..8054c90 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java @@ -5,38 +5,159 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakeException; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.ObjectUtil; + +import java.util.concurrent.TimeUnit; public class WebSocketHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final WebSocketServerProtocolConfig serverConfig; + private ChannelHandlerContext ctx; + private ChannelPromise handshakePromise; + private boolean isWebSocketPath; + + public WebSocketHandshakeHandler(WebSocketServerProtocolConfig serverConfig) { + this.serverConfig = (WebSocketServerProtocolConfig)ObjectUtil.checkNotNull(serverConfig, "serverConfig"); + } + @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof FullHttpRequest) { - FullHttpRequest request = (FullHttpRequest) msg; - String token = request.headers().get("Sec-Websocket-Protocol"); - NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); - // 构建WebSocket握手处理器 - WebSocketServerHandshakerFactory handshakeFactory = new WebSocketServerHandshakerFactory( - request.uri(), token, false); - WebSocketServerHandshaker handshake = handshakeFactory.newHandshaker(request); - final ChannelFuture handshakeFuture = handshake.handshake(ctx.channel(), request); - ctx.pipeline().remove(this); - handshakeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - ctx.fireExceptionCaught(future.cause()); + public void handlerAdded(ChannelHandlerContext ctx) { + this.ctx = ctx; + this.handshakePromise = ctx.newPromise(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + HttpObject httpObject = (HttpObject)msg; + if (httpObject instanceof HttpRequest) { + final HttpRequest req = (HttpRequest)httpObject; + this.isWebSocketPath = this.isWebSocketPath(req); + if (!this.isWebSocketPath) { + ctx.fireChannelRead(msg); + return; + } + try { + if (HttpMethod.GET.equals(req.method())) { + final String token = req.headers().get("Sec-Websocket-Protocol"); + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(ctx.pipeline(), req, this.serverConfig.websocketPath()), token, this.serverConfig.decoderConfig()); + final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); + NettyUtil.setAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY,handshaker); + final ChannelPromise localHandshakePromise = this.handshakePromise; + if (handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { - // 手动触发WebSocket握手状态事件 - ctx.fireUserEventTriggered( - WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); + + ctx.pipeline().remove(this); + ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); + handshakeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (!future.isSuccess()) { + localHandshakePromise.tryFailure(future.cause()); + ctx.fireExceptionCaught(future.cause()); + } else { + localHandshakePromise.trySuccess(); + NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); + ctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); + } + } + }); + this.applyHandshakeTimeout(); } + + return; } - }); + + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0))); + } finally { + ReferenceCountUtil.release(req); + } + + return; + } else if (!this.isWebSocketPath) { + ctx.fireChannelRead(msg); } else { - super.channelRead(ctx, msg); + ReferenceCountUtil.release(msg); + } + + } + + private boolean isWebSocketPath(HttpRequest req) { + String websocketPath = this.serverConfig.websocketPath(); + String uri = req.uri(); + boolean checkStartUri = uri.startsWith(websocketPath); + boolean checkNextUri = "/".equals(websocketPath) || this.checkNextUri(uri, websocketPath); + return this.serverConfig.checkStartsWith() ? checkStartUri && checkNextUri : uri.equals(websocketPath); + } + + private boolean checkNextUri(String uri, String websocketPath) { + int len = websocketPath.length(); + if (uri.length() <= len) { + return true; + } else { + char nextUri = uri.charAt(len); + return nextUri == '/' || nextUri == '?'; } } + + private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { + ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + + } + + private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { + String protocol = "ws"; + if (cp.get(SslHandler.class) != null) { + protocol = "wss"; + } + + String host = req.headers().get(HttpHeaderNames.HOST); + return protocol + "://" + host + path; + } + + private void applyHandshakeTimeout() { + final ChannelPromise localHandshakePromise = this.handshakePromise; + long handshakeTimeoutMillis = this.serverConfig.handshakeTimeoutMillis(); + if (handshakeTimeoutMillis > 0L && !localHandshakePromise.isDone()) { + final Future timeoutFuture = this.ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (!localHandshakePromise.isDone() && localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) { + WebSocketHandshakeHandler.this.ctx.flush().fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT).close(); + } + + } + }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); + localHandshakePromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) { + timeoutFuture.cancel(false); + } + }); + } + } + } From f7a80fa8d9da58ae85d5725fb5aa9b4d5f2c8b2c Mon Sep 17 00:00:00 2001 From: zbzbzzz Date: Mon, 3 Jul 2023 18:09:27 +0800 Subject: [PATCH 2/5] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96ws=E6=8F=A1=E6=89=8B?= =?UTF-8?q?=E5=90=8C=E6=97=B6=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mallchat/custom/user/websocket/NettyWebSocketServer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java index b84bc06..3448528 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java @@ -10,7 +10,6 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -90,7 +89,6 @@ public class NettyWebSocketServer { * 是通过一个状态码 101 来切换的 */ pipeline.addLast(new NettyWebSocketServerProtocolHandler("/")); - new WebSocketServerProtocolHandler("/"); // 自定义handler ,处理业务逻辑 pipeline.addLast(new NettyWebSocketServerHandler()); } From e671c66a26a90de73de339c14339dac93e7ad484 Mon Sep 17 00:00:00 2001 From: zbzbzzz Date: Mon, 3 Jul 2023 18:34:14 +0800 Subject: [PATCH 3/5] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96ws=E6=8F=A1=E6=89=8B?= =?UTF-8?q?=E5=90=8C=E6=97=B6=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../user/websocket/NettyWebSocketServerProtocolHandler.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java index 4ada0c9..73c7471 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java @@ -8,6 +8,7 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; +import io.netty.handler.codec.http.websocketx.Utf8FrameValidator; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; @@ -37,6 +38,10 @@ public class NettyWebSocketServerProtocolHandler extends WebSocketServerProtocol cp.addBefore(ctx.name(), WebSocketHandshakeHandler.class.getName(), new WebSocketHandshakeHandler(this.webSocketServerProtocolConfig)); } + if (this.webSocketServerProtocolConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) { + cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(), new Utf8FrameValidator(this.webSocketServerProtocolConfig.decoderConfig().closeOnProtocolViolation())); + } + } @Override From 5d8e32abf86646fc962801fe09d5d3268f2d2359 Mon Sep 17 00:00:00 2001 From: zbzbzzz Date: Tue, 4 Jul 2023 09:56:35 +0800 Subject: [PATCH 4/5] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96ws=E6=8F=A1=E6=89=8B?= =?UTF-8?q?=E5=90=8C=E6=97=B6=E8=AE=A4=E8=AF=81=20=E5=8A=A0=E4=B8=8Atoken?= =?UTF-8?q?=E4=B8=BA=E7=A9=BA=E5=88=A4=E6=96=AD=20=E5=8E=BB=E6=8E=89?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E8=AE=A4=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../custom/user/service/impl/WebSocketServiceImpl.java | 10 +++++----- .../user/websocket/NettyWebSocketServerHandler.java | 9 ++++++--- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index 96d0f68..8515083 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -29,11 +29,14 @@ import me.chanjar.weixin.mp.bean.result.WxMpQrCodeTicket; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; -import java.util.*; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.Condition; @@ -141,10 +144,7 @@ public class WebSocketServiceImpl implements WebSocketService { } } - // 这里可以加异步或者事件去处理登录 - // 因为登录和推送消息其实是不用同步的两个步骤 这样可以加快连接的速度不占用nio线程 @Override - @Async public void authorize(Channel channel, WSAuthorize wsAuthorize) { //校验token boolean verifySuccess = loginService.verify(wsAuthorize.getToken()); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java index b4fbb0a..ecbbb24 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java @@ -1,5 +1,6 @@ package com.abin.mallchat.custom.user.websocket; +import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.abin.mallchat.custom.user.domain.enums.WSReqTypeEnum; @@ -19,6 +20,7 @@ import lombok.extern.slf4j.Slf4j; public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler { private WebSocketService webSocketService; + // 当web客户端连接后,触发该方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @@ -67,9 +69,10 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler Date: Tue, 4 Jul 2023 17:04:52 +0800 Subject: [PATCH 5/5] =?UTF-8?q?fix:=E6=8F=A1=E6=89=8B=E8=AE=A4=E8=AF=81?= =?UTF-8?q?=E6=96=B9=E5=BC=8F=E6=94=B9=E4=B8=BA=E5=8F=82=E6=95=B0=E8=AE=A4?= =?UTF-8?q?=E8=AF=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/algorithm/ac/CreateTokenTest.java | 1 - .../user/websocket/HttpHeadersHandler.java | 17 +- .../user/websocket/NettyWebSocketServer.java | 3 +- .../NettyWebSocketServerHandler.java | 2 +- .../NettyWebSocketServerProtocolHandler.java | 66 ------- .../websocket/WebSocketHandshakeHandler.java | 163 ------------------ 6 files changed, 18 insertions(+), 234 deletions(-) delete mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerProtocolHandler.java delete mode 100644 mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java diff --git a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java index 34d4035..06c1845 100644 --- a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java +++ b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java @@ -31,5 +31,4 @@ public class CreateTokenTest { log.info("decode error,token:{}", token, e); } } - } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/HttpHeadersHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/HttpHeadersHandler.java index 30f6cdd..b6cdd43 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/HttpHeadersHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/HttpHeadersHandler.java @@ -1,5 +1,6 @@ package com.abin.mallchat.custom.user.websocket; +import cn.hutool.core.net.url.UrlBuilder; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.FullHttpRequest; @@ -13,7 +14,16 @@ public class HttpHeadersHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { - HttpHeaders headers = ((FullHttpRequest) msg).headers(); + FullHttpRequest request = (FullHttpRequest) msg; + UrlBuilder urlBuilder = UrlBuilder.ofHttp(request.uri()); + + // 获取token参数 + String token = urlBuilder.getQuery().get("token").toString(); + NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); + + // 获取请求路径 + request.setUri(urlBuilder.getPath().toString()); + HttpHeaders headers = request.headers(); String ip = headers.get("X-Real-IP"); if (StringUtils.isEmpty(ip)) {//如果没经过nginx,就直接获取远端地址 InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); @@ -21,7 +31,10 @@ public class HttpHeadersHandler extends ChannelInboundHandlerAdapter { } NettyUtil.setAttr(ctx.channel(), NettyUtil.IP, ip); ctx.pipeline().remove(this); + ctx.fireChannelRead(request); + }else + { + ctx.fireChannelRead(msg); } - ctx.fireChannelRead(msg); } } \ No newline at end of file diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java index 3448528..4ebf470 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java @@ -10,6 +10,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -88,7 +89,7 @@ public class NettyWebSocketServer { * 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接; * 是通过一个状态码 101 来切换的 */ - pipeline.addLast(new NettyWebSocketServerProtocolHandler("/")); + pipeline.addLast(new WebSocketServerProtocolHandler("/")); // 自定义handler ,处理业务逻辑 pipeline.addLast(new NettyWebSocketServerHandler()); } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java index ecbbb24..dbc62f9 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java @@ -71,7 +71,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler out) throws Exception { - if (this.webSocketServerProtocolConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) { - WebSocketServerHandshaker handshaker = NettyUtil.getAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY); - if (handshaker != null) { - frame.retain(); - ChannelPromise promise = ctx.newPromise(); - Method closeSent = ReflectUtil.getMethod(super.getClass(), "closeSent", ChannelPromise.class); - closeSent.setAccessible(true); - closeSent.invoke(this,promise); - handshaker.close(ctx, (CloseWebSocketFrame)frame, promise); - } else { - ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); - } - } else { - super.decode(ctx, frame, out); - } - } -} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java deleted file mode 100644 index 8054c90..0000000 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java +++ /dev/null @@ -1,163 +0,0 @@ -package com.abin.mallchat.custom.user.websocket; - - -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakeException; -import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; -import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; -import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.ReferenceCountUtil; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.internal.ObjectUtil; - -import java.util.concurrent.TimeUnit; - -public class WebSocketHandshakeHandler extends ChannelInboundHandlerAdapter { - - private final WebSocketServerProtocolConfig serverConfig; - private ChannelHandlerContext ctx; - private ChannelPromise handshakePromise; - private boolean isWebSocketPath; - - public WebSocketHandshakeHandler(WebSocketServerProtocolConfig serverConfig) { - this.serverConfig = (WebSocketServerProtocolConfig)ObjectUtil.checkNotNull(serverConfig, "serverConfig"); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - this.ctx = ctx; - this.handshakePromise = ctx.newPromise(); - } - - @Override - public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { - HttpObject httpObject = (HttpObject)msg; - if (httpObject instanceof HttpRequest) { - final HttpRequest req = (HttpRequest)httpObject; - this.isWebSocketPath = this.isWebSocketPath(req); - if (!this.isWebSocketPath) { - ctx.fireChannelRead(msg); - return; - } - try { - if (HttpMethod.GET.equals(req.method())) { - final String token = req.headers().get("Sec-Websocket-Protocol"); - WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(ctx.pipeline(), req, this.serverConfig.websocketPath()), token, this.serverConfig.decoderConfig()); - final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); - NettyUtil.setAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY,handshaker); - final ChannelPromise localHandshakePromise = this.handshakePromise; - if (handshaker == null) { - WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); - } else { - - ctx.pipeline().remove(this); - ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); - handshakeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - localHandshakePromise.tryFailure(future.cause()); - ctx.fireExceptionCaught(future.cause()); - } else { - localHandshakePromise.trySuccess(); - NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); - ctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); - } - } - }); - this.applyHandshakeTimeout(); - } - - return; - } - - sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0))); - } finally { - ReferenceCountUtil.release(req); - } - - return; - } else if (!this.isWebSocketPath) { - ctx.fireChannelRead(msg); - } else { - ReferenceCountUtil.release(msg); - } - - } - - private boolean isWebSocketPath(HttpRequest req) { - String websocketPath = this.serverConfig.websocketPath(); - String uri = req.uri(); - boolean checkStartUri = uri.startsWith(websocketPath); - boolean checkNextUri = "/".equals(websocketPath) || this.checkNextUri(uri, websocketPath); - return this.serverConfig.checkStartsWith() ? checkStartUri && checkNextUri : uri.equals(websocketPath); - } - - private boolean checkNextUri(String uri, String websocketPath) { - int len = websocketPath.length(); - if (uri.length() <= len) { - return true; - } else { - char nextUri = uri.charAt(len); - return nextUri == '/' || nextUri == '?'; - } - } - - private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { - ChannelFuture f = ctx.channel().writeAndFlush(res); - if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { - f.addListener(ChannelFutureListener.CLOSE); - } - - } - - private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { - String protocol = "ws"; - if (cp.get(SslHandler.class) != null) { - protocol = "wss"; - } - - String host = req.headers().get(HttpHeaderNames.HOST); - return protocol + "://" + host + path; - } - - private void applyHandshakeTimeout() { - final ChannelPromise localHandshakePromise = this.handshakePromise; - long handshakeTimeoutMillis = this.serverConfig.handshakeTimeoutMillis(); - if (handshakeTimeoutMillis > 0L && !localHandshakePromise.isDone()) { - final Future timeoutFuture = this.ctx.executor().schedule(new Runnable() { - @Override - public void run() { - if (!localHandshakePromise.isDone() && localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) { - WebSocketHandshakeHandler.this.ctx.flush().fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT).close(); - } - - } - }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); - localHandshakePromise.addListener(new FutureListener() { - @Override - public void operationComplete(Future f) { - timeoutFuture.cancel(false); - } - }); - } - } - -}