diff --git a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java index 50908e4..34d4035 100644 --- a/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java +++ b/mallchat-common/src/test/java/com/abin/mallchat/common/common/algorithm/ac/CreateTokenTest.java @@ -6,7 +6,6 @@ import com.auth0.jwt.algorithms.Algorithm; import com.auth0.jwt.interfaces.DecodedJWT; import com.auth0.jwt.interfaces.JWTVerifier; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.junit.Test; import java.util.Date; @@ -18,7 +17,7 @@ public class CreateTokenTest { @Test public void create(){ String token = JWT.create() - .withClaim("uid", 123L) // 只存一个uid信息,其他的自己去redis查 + .withClaim("uid", 10004L) // 只存一个uid信息,其他的自己去redis查 .withClaim("createTime", new Date()) .sign(Algorithm.HMAC256("dsfsdfsdfsdfsd")); // signature log.info("生成的token为 {}",token); @@ -33,12 +32,4 @@ public class CreateTokenTest { } } - @Test - public void verifyToken(){ - String token = JWT.create() - .withClaim("uid", 1) // 只存一个uid信息,其他的自己去redis查 - .withClaim("createTime", new Date()) - .sign(Algorithm.HMAC256("dsfsdfsdfsdfsd")); // signature - log.info("生成的token为{}",token); - } } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java index 7282473..96d0f68 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/service/impl/WebSocketServiceImpl.java @@ -29,6 +29,7 @@ import me.chanjar.weixin.mp.bean.result.WxMpQrCodeTicket; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -140,7 +141,10 @@ public class WebSocketServiceImpl implements WebSocketService { } } + // 这里可以加异步或者事件去处理登录 + // 因为登录和推送消息其实是不用同步的两个步骤 这样可以加快连接的速度不占用nio线程 @Override + @Async public void authorize(Channel channel, WSAuthorize wsAuthorize) { //校验token boolean verifySuccess = loginService.verify(wsAuthorize.getToken()); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java index 79daafa..bcc5b05 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyUtil.java @@ -1,6 +1,7 @@ package com.abin.mallchat.custom.user.websocket; import io.netty.channel.Channel; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.util.Attribute; import io.netty.util.AttributeKey; @@ -15,6 +16,7 @@ public class NettyUtil { public static AttributeKey TOKEN = AttributeKey.valueOf("token"); public static AttributeKey IP = AttributeKey.valueOf("ip"); public static AttributeKey UID = AttributeKey.valueOf("uid"); + public static AttributeKey HANDSHAKER_ATTR_KEY = AttributeKey.valueOf(WebSocketServerHandshaker.class, "HANDSHAKER"); public static void setAttr(Channel channel, AttributeKey attributeKey, T data) { Attribute attr = channel.attr(attributeKey); diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java index b70b867..b84bc06 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServer.java @@ -10,6 +10,7 @@ import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; @@ -88,7 +89,8 @@ public class NettyWebSocketServer { * 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws 协议,保持长连接; * 是通过一个状态码 101 来切换的 */ - pipeline.addLast(new WebSocketHandshakeHandler()); + pipeline.addLast(new NettyWebSocketServerProtocolHandler("/")); + new WebSocketServerProtocolHandler("/"); // 自定义handler ,处理业务逻辑 pipeline.addLast(new NettyWebSocketServerHandler()); } diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java index 3d75069..b4fbb0a 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/NettyWebSocketServerHandler.java @@ -1,6 +1,5 @@ package com.abin.mallchat.custom.user.websocket; -import cn.hutool.core.util.StrUtil; import cn.hutool.extra.spring.SpringUtil; import cn.hutool.json.JSONUtil; import com.abin.mallchat.custom.user.domain.enums.WSReqTypeEnum; @@ -19,10 +18,11 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler { + private WebSocketService webSocketService; // 当web客户端连接后,触发该方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { -// getService().connect(ctx.channel()); + this.webSocketService = getService(); } // 客户端离线 @@ -45,7 +45,7 @@ public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler out) throws Exception { + if (this.webSocketServerProtocolConfig.handleCloseFrames() && frame instanceof CloseWebSocketFrame) { + WebSocketServerHandshaker handshaker = NettyUtil.getAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY); + if (handshaker != null) { + frame.retain(); + ChannelPromise promise = ctx.newPromise(); + Method closeSent = ReflectUtil.getMethod(super.getClass(), "closeSent", ChannelPromise.class); + closeSent.setAccessible(true); + closeSent.invoke(this,promise); + handshaker.close(ctx, (CloseWebSocketFrame)frame, promise); + } else { + ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } else { + super.decode(ctx, frame, out); + } + } +} diff --git a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java index f501602..8054c90 100644 --- a/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java +++ b/mallchat-custom-server/src/main/java/com/abin/mallchat/custom/user/websocket/WebSocketHandshakeHandler.java @@ -5,38 +5,159 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpObject; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpUtil; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakeException; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolConfig; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.internal.ObjectUtil; + +import java.util.concurrent.TimeUnit; public class WebSocketHandshakeHandler extends ChannelInboundHandlerAdapter { + + private final WebSocketServerProtocolConfig serverConfig; + private ChannelHandlerContext ctx; + private ChannelPromise handshakePromise; + private boolean isWebSocketPath; + + public WebSocketHandshakeHandler(WebSocketServerProtocolConfig serverConfig) { + this.serverConfig = (WebSocketServerProtocolConfig)ObjectUtil.checkNotNull(serverConfig, "serverConfig"); + } + @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof FullHttpRequest) { - FullHttpRequest request = (FullHttpRequest) msg; - String token = request.headers().get("Sec-Websocket-Protocol"); - NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); - // 构建WebSocket握手处理器 - WebSocketServerHandshakerFactory handshakeFactory = new WebSocketServerHandshakerFactory( - request.uri(), token, false); - WebSocketServerHandshaker handshake = handshakeFactory.newHandshaker(request); - final ChannelFuture handshakeFuture = handshake.handshake(ctx.channel(), request); - ctx.pipeline().remove(this); - handshakeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - ctx.fireExceptionCaught(future.cause()); + public void handlerAdded(ChannelHandlerContext ctx) { + this.ctx = ctx; + this.handshakePromise = ctx.newPromise(); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + HttpObject httpObject = (HttpObject)msg; + if (httpObject instanceof HttpRequest) { + final HttpRequest req = (HttpRequest)httpObject; + this.isWebSocketPath = this.isWebSocketPath(req); + if (!this.isWebSocketPath) { + ctx.fireChannelRead(msg); + return; + } + try { + if (HttpMethod.GET.equals(req.method())) { + final String token = req.headers().get("Sec-Websocket-Protocol"); + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(ctx.pipeline(), req, this.serverConfig.websocketPath()), token, this.serverConfig.decoderConfig()); + final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req); + NettyUtil.setAttr(ctx.channel(),NettyUtil.HANDSHAKER_ATTR_KEY,handshaker); + final ChannelPromise localHandshakePromise = this.handshakePromise; + if (handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { - // 手动触发WebSocket握手状态事件 - ctx.fireUserEventTriggered( - WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); + + ctx.pipeline().remove(this); + ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req); + handshakeFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (!future.isSuccess()) { + localHandshakePromise.tryFailure(future.cause()); + ctx.fireExceptionCaught(future.cause()); + } else { + localHandshakePromise.trySuccess(); + NettyUtil.setAttr(ctx.channel(), NettyUtil.TOKEN, token); + ctx.fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); + } + } + }); + this.applyHandshakeTimeout(); } + + return; } - }); + + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0))); + } finally { + ReferenceCountUtil.release(req); + } + + return; + } else if (!this.isWebSocketPath) { + ctx.fireChannelRead(msg); } else { - super.channelRead(ctx, msg); + ReferenceCountUtil.release(msg); + } + + } + + private boolean isWebSocketPath(HttpRequest req) { + String websocketPath = this.serverConfig.websocketPath(); + String uri = req.uri(); + boolean checkStartUri = uri.startsWith(websocketPath); + boolean checkNextUri = "/".equals(websocketPath) || this.checkNextUri(uri, websocketPath); + return this.serverConfig.checkStartsWith() ? checkStartUri && checkNextUri : uri.equals(websocketPath); + } + + private boolean checkNextUri(String uri, String websocketPath) { + int len = websocketPath.length(); + if (uri.length() <= len) { + return true; + } else { + char nextUri = uri.charAt(len); + return nextUri == '/' || nextUri == '?'; } } + + private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) { + ChannelFuture f = ctx.channel().writeAndFlush(res); + if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + + } + + private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { + String protocol = "ws"; + if (cp.get(SslHandler.class) != null) { + protocol = "wss"; + } + + String host = req.headers().get(HttpHeaderNames.HOST); + return protocol + "://" + host + path; + } + + private void applyHandshakeTimeout() { + final ChannelPromise localHandshakePromise = this.handshakePromise; + long handshakeTimeoutMillis = this.serverConfig.handshakeTimeoutMillis(); + if (handshakeTimeoutMillis > 0L && !localHandshakePromise.isDone()) { + final Future timeoutFuture = this.ctx.executor().schedule(new Runnable() { + @Override + public void run() { + if (!localHandshakePromise.isDone() && localHandshakePromise.tryFailure(new WebSocketServerHandshakeException("handshake timed out"))) { + WebSocketHandshakeHandler.this.ctx.flush().fireUserEventTriggered(WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT).close(); + } + + } + }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); + localHandshakePromise.addListener(new FutureListener() { + @Override + public void operationComplete(Future f) { + timeoutFuture.cancel(false); + } + }); + } + } + }