From 7c2c4d99f5eae9c2aa86fad895ca56127df8fb47 Mon Sep 17 00:00:00 2001 From: zhongzb <972627721@qq.com> Date: Mon, 13 Nov 2023 22:31:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:websocket=E5=8E=8B=E6=B5=8B=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mallchat/common/websocket/Simulator.java | 20 ++++ .../common/websocket/WebSocketConnector.java | 95 +++++++++++++++ .../common/websocket/WebSocketIoHandler.java | 112 ++++++++++++++++++ 3 files changed, 227 insertions(+) create mode 100644 mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/Simulator.java create mode 100644 mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketConnector.java create mode 100644 mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketIoHandler.java diff --git a/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/Simulator.java b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/Simulator.java new file mode 100644 index 0000000..cf87022 --- /dev/null +++ b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/Simulator.java @@ -0,0 +1,20 @@ +package com.abin.mallchat.common.websocket; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +public class Simulator { + public static void start() { + String serverIp = "101.33.251.36"; + int serverPort = 8090; + EventLoopGroup group = new NioEventLoopGroup(); + for (int i = 0; i < 10000; i++) { + WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, group); + client.doConnect(); + } + } + + public static void main(String[] args) { + start(); + } +} diff --git a/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketConnector.java b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketConnector.java new file mode 100644 index 0000000..e7ba189 --- /dev/null +++ b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketConnector.java @@ -0,0 +1,95 @@ +package com.abin.mallchat.common.websocket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpHeaders; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import lombok.extern.slf4j.Slf4j; + +import java.net.URI; + +/** + * WebSocket协议类型的模拟客户端连接器类 + * + * @author duyanjun + * @since 2022/10/13 杜燕军 新建 + */ +@Slf4j +public class WebSocketConnector { + // 服务器ip + protected String serverIp; + // 服务器通信端口 + protected int serverSocketPort; + // 事件循环线程池 + protected EventLoopGroup group; + // 网络通道 + private Channel channel; + + /** + * WebSocket协议类型的模拟客户端连接器构造方法 + * + * @param serverIp + * @param serverSocketPort + * @param group + */ + public WebSocketConnector(String serverIp, int serverSocketPort, EventLoopGroup group) { + this.serverIp = serverIp; + this.serverSocketPort = serverSocketPort; + this.group = group; + } + + public void doConnect() { + try { + String URL = "ws://" + this.serverIp + ":" + this.serverSocketPort + "/"; + URI uri = new URI(URL); + final WebSocketIoHandler handler = + new WebSocketIoHandler( + WebSocketClientHandshakerFactory.newHandshaker( + uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(group).channel(NioSocketChannel.class) + //.option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + //空闲事件 + pipeline.addLast(new IdleStateHandler(0, 10, 0)); + // 添加一个http的编解码器 + pipeline.addLast(new HttpClientCodec()); + // 添加一个用于支持大数据流的支持 + pipeline.addLast(new ChunkedWriteHandler()); + // 添加一个聚合器,这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response + pipeline.addLast(new HttpObjectAggregator(1024 * 64)); + pipeline.addLast(handler); + } + }); + try { + synchronized (bootstrap) { + final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync(); + this.channel = future.channel(); + } + } catch (InterruptedException e) { + log.error("连接服务失败.......................uri:" + uri.toString(), e); + } catch (Exception e) { + log.error("连接服务失败.......................uri:" + uri.toString(), e); + } + } catch (Exception e) { + log.error("连接服务失败.......................", e); + } finally { + } + + } + + public void disConnect() { + this.channel.close(); + } +} diff --git a/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketIoHandler.java b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketIoHandler.java new file mode 100644 index 0000000..0358493 --- /dev/null +++ b/mallchat-chat-server/src/test/java/com/abin/mallchat/common/websocket/WebSocketIoHandler.java @@ -0,0 +1,112 @@ +package com.abin.mallchat.common.websocket; + +import io.netty.channel.*; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +/** + * WebSocket协议类型的模拟客户端IO处理器类 + * + * @author duyanjun + * @since 2022/10/13 杜燕军 新建 + */ +@Slf4j +public class WebSocketIoHandler extends SimpleChannelInboundHandler { + + private final WebSocketClientHandshaker handShaker; + + private ChannelPromise handshakeFuture; + + public WebSocketIoHandler(WebSocketClientHandshaker handShaker) { + this.handShaker = handShaker; + } + + public ChannelFuture handshakeFuture() { + return handshakeFuture; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + handshakeFuture = ctx.newPromise(); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + handShaker.handshake(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + ctx.close(); + try { + super.channelInactive(ctx); + } catch (Exception e) { + log.error("channelInactive 异常.", e); + } + log.warn("WebSocket链路与服务器连接已断开."); + } + + @Override + public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + Channel ch = ctx.channel(); + if (!handShaker.isHandshakeComplete()) { + try { + handShaker.finishHandshake(ch, (FullHttpResponse) msg); + handshakeFuture.setSuccess(); + log.info("WebSocket握手成功,可以传输数据了."); + } catch (WebSocketHandshakeException e) { + log.warn("WebSocket Client failed to connect"); + handshakeFuture.setFailure(e); + } + return; + } + + if (msg instanceof FullHttpResponse) { + FullHttpResponse response = (FullHttpResponse) msg; + throw new IllegalStateException( + "Unexpected FullHttpResponse (getStatus=" + response.status() + + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); + } + + WebSocketFrame frame = (WebSocketFrame) msg; + if (frame instanceof TextWebSocketFrame) { + TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; + String s = textFrame.text(); + log.info("WebSocket Client received message: " + s); + } else if (frame instanceof PongWebSocketFrame) { + log.info("WebSocket Client received pong"); + } else if (frame instanceof CloseWebSocketFrame) { + log.info("WebSocket Client received closing"); + ch.close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause); + if (!handshakeFuture.isDone()) { + handshakeFuture.setFailure(cause); + } + ctx.close(); + super.exceptionCaught(ctx, cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + // 如果写通道处于空闲状态,就发送心跳命令 + if (IdleState.WRITER_IDLE.equals(event.state())) { + // 发送心跳数据 + ctx.writeAndFlush(new TextWebSocketFrame("{\"type\":2}")); + System.out.println("发送心跳数据"); + } + } else { + super.userEventTriggered(ctx, evt); + } + } +}