feat:websocket压测测试类

This commit is contained in:
zhongzb
2023-11-13 22:31:03 +08:00
parent 6876aac29d
commit 7c2c4d99f5
3 changed files with 227 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
package com.abin.mallchat.common.websocket;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
public class Simulator {
public static void start() {
String serverIp = "101.33.251.36";
int serverPort = 8090;
EventLoopGroup group = new NioEventLoopGroup();
for (int i = 0; i < 10000; i++) {
WebSocketConnector client = new WebSocketConnector(serverIp, serverPort, group);
client.doConnect();
}
}
public static void main(String[] args) {
start();
}
}

View File

@@ -0,0 +1,95 @@
package com.abin.mallchat.common.websocket;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
/**
* WebSocket协议类型的模拟客户端连接器类
*
* @author duyanjun
* @since 2022/10/13 杜燕军 新建
*/
@Slf4j
public class WebSocketConnector {
// 服务器ip
protected String serverIp;
// 服务器通信端口
protected int serverSocketPort;
// 事件循环线程池
protected EventLoopGroup group;
// 网络通道
private Channel channel;
/**
* WebSocket协议类型的模拟客户端连接器构造方法
*
* @param serverIp
* @param serverSocketPort
* @param group
*/
public WebSocketConnector(String serverIp, int serverSocketPort, EventLoopGroup group) {
this.serverIp = serverIp;
this.serverSocketPort = serverSocketPort;
this.group = group;
}
public void doConnect() {
try {
String URL = "ws://" + this.serverIp + ":" + this.serverSocketPort + "/";
URI uri = new URI(URL);
final WebSocketIoHandler handler =
new WebSocketIoHandler(
WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders()));
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
//.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//空闲事件
pipeline.addLast(new IdleStateHandler(0, 10, 0));
// 添加一个http的编解码器
pipeline.addLast(new HttpClientCodec());
// 添加一个用于支持大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一个聚合器这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(1024 * 64));
pipeline.addLast(handler);
}
});
try {
synchronized (bootstrap) {
final ChannelFuture future = bootstrap.connect(this.serverIp, this.serverSocketPort).sync();
this.channel = future.channel();
}
} catch (InterruptedException e) {
log.error("连接服务失败.......................uri:" + uri.toString(), e);
} catch (Exception e) {
log.error("连接服务失败.......................uri:" + uri.toString(), e);
}
} catch (Exception e) {
log.error("连接服务失败.......................", e);
} finally {
}
}
public void disConnect() {
this.channel.close();
}
}

View File

@@ -0,0 +1,112 @@
package com.abin.mallchat.common.websocket;
import io.netty.channel.*;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket协议类型的模拟客户端IO处理器类
*
* @author duyanjun
* @since 2022/10/13 杜燕军 新建
*/
@Slf4j
public class WebSocketIoHandler extends SimpleChannelInboundHandler<Object> {
private final WebSocketClientHandshaker handShaker;
private ChannelPromise handshakeFuture;
public WebSocketIoHandler(WebSocketClientHandshaker handShaker) {
this.handShaker = handShaker;
}
public ChannelFuture handshakeFuture() {
return handshakeFuture;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
handShaker.handshake(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.close();
try {
super.channelInactive(ctx);
} catch (Exception e) {
log.error("channelInactive 异常.", e);
}
log.warn("WebSocket链路与服务器连接已断开.");
}
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
Channel ch = ctx.channel();
if (!handShaker.isHandshakeComplete()) {
try {
handShaker.finishHandshake(ch, (FullHttpResponse) msg);
handshakeFuture.setSuccess();
log.info("WebSocket握手成功可以传输数据了.");
} catch (WebSocketHandshakeException e) {
log.warn("WebSocket Client failed to connect");
handshakeFuture.setFailure(e);
}
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
throw new IllegalStateException(
"Unexpected FullHttpResponse (getStatus=" + response.status() +
", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
}
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String s = textFrame.text();
log.info("WebSocket Client received message: " + s);
} else if (frame instanceof PongWebSocketFrame) {
log.info("WebSocket Client received pong");
} else if (frame instanceof CloseWebSocketFrame) {
log.info("WebSocket Client received closing");
ch.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("WebSocket链路由于发生异常,与服务器连接已断开.", cause);
if (!handshakeFuture.isDone()) {
handshakeFuture.setFailure(cause);
}
ctx.close();
super.exceptionCaught(ctx, cause);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 如果写通道处于空闲状态,就发送心跳命令
if (IdleState.WRITER_IDLE.equals(event.state())) {
// 发送心跳数据
ctx.writeAndFlush(new TextWebSocketFrame("{\"type\":2}"));
System.out.println("发送心跳数据");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}