init v1.0.0

This commit is contained in:
ageer
2024-02-27 20:52:19 +08:00
parent 1f7f97e86a
commit a079ef44e5
602 changed files with 163057 additions and 95 deletions

View File

@@ -0,0 +1,54 @@
<!--
~ MIT License
~
~ Copyright (c) 2023 OrdinaryRoad
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xmzs</groupId>
<artifactId>live-chat-client-servers</artifactId>
<version>1.0.0</version>
</parent>
<packaging>jar</packaging>
<artifactId>live-chat-client-servers-netty-client</artifactId>
<name>live-chat-client-servers-netty</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.xmzs</groupId>
<artifactId>live-chat-client-commons-client</artifactId>
</dependency>
<dependency>
<groupId>com.xmzs</groupId>
<artifactId>live-chat-client-servers-netty</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,349 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.client.base;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import tech.ordinaryroad.live.chat.client.commons.base.exception.BaseException;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseConnectionListener;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseMsgListener;
import tech.ordinaryroad.live.chat.client.commons.base.msg.IMsg;
import tech.ordinaryroad.live.chat.client.commons.client.BaseLiveChatClient;
import tech.ordinaryroad.live.chat.client.commons.client.enums.ClientStatusEnums;
import tech.ordinaryroad.live.chat.client.servers.netty.client.config.BaseNettyClientConfig;
import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseBinaryFrameHandler;
import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseConnectionHandler;
import javax.net.ssl.SSLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* @author mjz
* @date 2023/8/26
*/
@Slf4j
public abstract class BaseNettyClient
<Config extends BaseNettyClientConfig,
CmdEnum extends Enum<CmdEnum>,
Msg extends IMsg,
MsgListener extends IBaseMsgListener<BinaryFrameHandler, CmdEnum>,
ConnectionHandler extends BaseConnectionHandler<ConnectionHandler>,
BinaryFrameHandler extends BaseBinaryFrameHandler<BinaryFrameHandler, CmdEnum, Msg, MsgListener>
>
extends BaseLiveChatClient<Config, MsgListener> {
@Getter
private final EventLoopGroup workerGroup;
@Getter
private final Bootstrap bootstrap = new Bootstrap();
private BinaryFrameHandler binaryFrameHandler;
private ConnectionHandler connectionHandler;
private IBaseConnectionListener<ConnectionHandler> connectionListener;
private Channel channel;
@Getter
private URI websocketUri;
protected IBaseConnectionListener<ConnectionHandler> clientConnectionListener;
/**
* 控制弹幕发送频率
*/
private volatile long lastSendDanmuTimeInMillis;
public abstract ConnectionHandler initConnectionHandler(IBaseConnectionListener<ConnectionHandler> clientConnectionListener);
public abstract BinaryFrameHandler initBinaryFrameHandler();
protected BaseNettyClient(Config config, EventLoopGroup workerGroup, IBaseConnectionListener<ConnectionHandler> connectionListener) {
super(config);
this.workerGroup = workerGroup;
this.connectionListener = connectionListener;
}
public void onConnected(ConnectionHandler connectionHandler) {
this.setStatus(ClientStatusEnums.CONNECTED);
if (this.connectionListener != null) {
this.connectionListener.onConnected(connectionHandler);
}
}
public void onConnectFailed(ConnectionHandler connectionHandler) {
this.setStatus(ClientStatusEnums.CONNECT_FAILED);
tryReconnect();
if (this.connectionListener != null) {
this.connectionListener.onConnectFailed(connectionHandler);
}
}
public void onDisconnected(ConnectionHandler connectionHandler) {
this.setStatus(ClientStatusEnums.DISCONNECTED);
tryReconnect();
if (this.connectionListener != null) {
this.connectionListener.onDisconnected(connectionHandler);
}
}
@Override
public void init() {
if (checkStatus(ClientStatusEnums.INITIALIZED)) {
return;
}
try {
this.websocketUri = new URI(getWebSocketUriString());
SslContext sslCtx = SslContextBuilder.forClient().build();
this.clientConnectionListener = new IBaseConnectionListener<ConnectionHandler>() {
@Override
public void onConnected(ConnectionHandler connectionHandler) {
BaseNettyClient.this.onConnected(connectionHandler);
}
@Override
public void onConnectFailed(ConnectionHandler connectionHandler) {
BaseNettyClient.this.onConnectFailed(connectionHandler);
}
@Override
public void onDisconnected(ConnectionHandler connectionHandler) {
BaseNettyClient.this.onDisconnected(connectionHandler);
}
};
this.binaryFrameHandler = this.initBinaryFrameHandler();
this.connectionHandler = this.initConnectionHandler(this.clientConnectionListener);
this.bootstrap.group(this.workerGroup)
// 创建Channel
.channel(NioSocketChannel.class)
.remoteAddress(this.websocketUri.getHost(), getInetPort())
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
// Channel配置
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 责任链
ChannelPipeline pipeline = ch.pipeline();
// 放到第一位 addFirst 支持wss链接服务端
pipeline.addFirst(sslCtx.newHandler(ch.alloc(), BaseNettyClient.this.websocketUri.getHost(), getInetPort()));
// 添加一个http的编解码器
pipeline.addLast(new HttpClientCodec());
// 添加一个用于支持大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 添加一个聚合器这个聚合器主要是将HttpMessage聚合成FullHttpRequest/Response
pipeline.addLast(new HttpObjectAggregator(BaseNettyClient.this.getConfig().getAggregatorMaxContentLength()));
// 连接处理器
pipeline.addLast(BaseNettyClient.this.connectionHandler);
// 弹幕处理器
pipeline.addLast(BaseNettyClient.this.binaryFrameHandler);
}
});
this.setStatus(ClientStatusEnums.INITIALIZED);
} catch (URISyntaxException e) {
throw new BaseException(e);
} catch (SSLException e) {
throw new BaseException(e);
}
}
private int getInetPort() {
int port = this.websocketUri.getPort();
return port == -1 ? "wss".equalsIgnoreCase(websocketUri.getScheme()) ? 443 : 80 : port;
}
@Override
public void connect(Runnable success, Consumer<Throwable> failed) {
if (this.cancelReconnect) {
this.cancelReconnect = false;
}
if (!checkStatus(ClientStatusEnums.INITIALIZED)) {
return;
}
if (getStatus() == ClientStatusEnums.CONNECTED) {
return;
}
if (getStatus() != ClientStatusEnums.RECONNECTING) {
this.setStatus(ClientStatusEnums.CONNECTING);
}
this.bootstrap.connect().addListener((ChannelFutureListener) connectFuture -> {
if (connectFuture.isSuccess()) {
if (log.isDebugEnabled()) {
log.debug("连接建立成功!");
}
this.channel = connectFuture.channel();
// 监听是否握手成功
this.connectionHandler.getHandshakeFuture().addListener((ChannelFutureListener) handshakeFuture -> {
try {
connectionHandler.sendAuthRequest(channel);
if (success != null) {
success.run();
}
} catch (Exception e) {
log.error("认证包发送失败,断开连接", e);
this.disconnect();
}
});
} else {
log.error("连接建立失败", connectFuture.cause());
this.onConnectFailed(this.connectionHandler);
if (failed != null) {
failed.accept(connectFuture.cause());
}
}
});
}
@Override
public void disconnect() {
if (this.channel == null) {
return;
}
this.channel.close();
}
@Override
protected void tryReconnect() {
if (this.cancelReconnect) {
this.cancelReconnect = false;
return;
}
if (!getConfig().isAutoReconnect()) {
return;
}
if (log.isWarnEnabled()) {
log.warn("{}s后将重新连接 {}", getConfig().getReconnectDelay(), getConfig().getRoomId());
}
workerGroup.schedule(() -> {
this.setStatus(ClientStatusEnums.RECONNECTING);
this.connect();
}, getConfig().getReconnectDelay(), TimeUnit.SECONDS);
}
@Override
public void send(Object msg, Runnable success, Consumer<Throwable> failed) {
ChannelFuture future = this.channel.writeAndFlush(msg);
if (success != null || failed != null) {
future.addListener((ChannelFutureListener) channelFuture -> {
if (channelFuture.isSuccess()) {
if (success != null) {
success.run();
}
} else {
if (failed != null) {
failed.accept(channelFuture.cause());
}
}
});
}
}
@Override
public void destroy() {
super.destroy();
// 销毁时不需要重连
this.cancelReconnect = true;
workerGroup.shutdownGracefully().addListener(future -> {
if (future.isSuccess()) {
this.setStatus(ClientStatusEnums.DESTROYED);
} else {
throw new BaseException("client销毁失败", future.cause());
}
});
}
@Override
protected String getWebSocketUriString() {
return getConfig().getWebsocketUri();
}
@Override
protected void setStatus(ClientStatusEnums status) {
if (log.isDebugEnabled()) {
if (getStatus() != status) {
log.debug("{} 状态变化 {} => {}\n", getClass().getSimpleName(), getStatus(), status);
}
}
super.setStatus(status);
}
@Override
public void sendDanmu(Object danmu, Runnable success, Consumer<Throwable> failed) {
throw new BaseException("暂未支持该功能");
}
@Override
public void clickLike(int count, Runnable success, Consumer<Throwable> failed) {
throw new BaseException("暂未支持该功能");
}
/**
* 发送弹幕前判断是否可以发送
*
* @param checkConnected 是否检查Client连接状态
*/
protected boolean checkCanSendDanmu(boolean checkConnected) {
if (checkConnected && getStatus() != ClientStatusEnums.CONNECTED) {
throw new BaseException("连接未建立,无法发送弹幕");
}
if (System.currentTimeMillis() - this.lastSendDanmuTimeInMillis <= getConfig().getMinSendDanmuPeriod()) {
if (log.isWarnEnabled()) {
log.warn("发送弹幕频率过快,忽略该次发送");
}
return false;
}
return true;
}
protected boolean checkCanSendDanmu() {
return checkCanSendDanmu(true);
}
/**
* 发送弹幕后调用该方法
*/
protected void finishSendDanmu() {
this.lastSendDanmuTimeInMillis = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("弹幕发送完成");
}
}
public void iteratorMsgListeners(Consumer<MsgListener> consumer) {
binaryFrameHandler.iteratorMsgListeners(consumer);
}
}

View File

@@ -0,0 +1,65 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.client.config;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import tech.ordinaryroad.live.chat.client.commons.client.config.BaseLiveChatClientConfig;
import java.net.URI;
/**
* @author mjz
* @date 2023/8/26
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@SuperBuilder(toBuilder = true)
public abstract class BaseNettyClientConfig extends BaseLiveChatClientConfig {
/**
* 聚合器允许的最大消息体长度,默认 64*1024 byte
*
* @see HttpObjectAggregator#HttpObjectAggregator(int)
*/
@Builder.Default
private int aggregatorMaxContentLength = 64 * 1024;
/**
* WebSocketClientHandshaker最大消息体长度默认 64*1024 byte
*
* @see WebSocketClientHandshakerFactory#newHandshaker(URI, WebSocketVersion, String, boolean, HttpHeaders, int)
*/
@Builder.Default
private int maxFramePayloadLength = 64 * 1024;
}

View File

@@ -0,0 +1,66 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.client.handler;
import lombok.Getter;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseMsgListener;
import tech.ordinaryroad.live.chat.client.commons.base.msg.IMsg;
import tech.ordinaryroad.live.chat.client.servers.netty.client.base.BaseNettyClient;
import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseBinaryFrameHandler;
import java.util.List;
/**
* BaseClientBinaryFrameHandler
*
* @author mjz
* @date 2023/8/30
*/
public abstract class BaseNettyClientBinaryFrameHandler<
Client extends BaseNettyClient<?, ?, ?, ?, ?, ?>,
BinaryFrameHandler extends BaseBinaryFrameHandler<BinaryFrameHandler, CmdEnum, Msg, MsgListener>,
CmdEnum extends Enum<CmdEnum>,
Msg extends IMsg,
MsgListener extends IBaseMsgListener<BinaryFrameHandler, CmdEnum>>
extends BaseBinaryFrameHandler<BinaryFrameHandler, CmdEnum, Msg, MsgListener> {
@Getter
protected final Client client;
public BaseNettyClientBinaryFrameHandler(List<MsgListener> msgListeners, Client client, long roomId) {
super(msgListeners, roomId);
this.client = client;
}
public BaseNettyClientBinaryFrameHandler(List<MsgListener> msgListeners, Client client) {
super(msgListeners, client.getConfig().getRoomId());
this.client = client;
}
public BaseNettyClientBinaryFrameHandler(List<MsgListener> msgListeners, long roomId) {
super(msgListeners, roomId);
this.client = null;
}
}

View File

@@ -0,0 +1,65 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.client.handler;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import lombok.Getter;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseConnectionListener;
import tech.ordinaryroad.live.chat.client.servers.netty.client.base.BaseNettyClient;
import tech.ordinaryroad.live.chat.client.servers.netty.handler.base.BaseConnectionHandler;
/**
* BaseClientConnectionHandler
*
* @author mjz
* @date 2023/8/27
*/
public abstract class BaseNettyClientConnectionHandler<
Client extends BaseNettyClient<?, ?, ?, ?, ?, ?>,
ConnectionHandler extends BaseConnectionHandler<ConnectionHandler>>
extends BaseConnectionHandler<ConnectionHandler> {
@Getter
protected final Client client;
public BaseNettyClientConnectionHandler(WebSocketClientHandshaker handshaker, Client client, IBaseConnectionListener<ConnectionHandler> listener) {
super(handshaker, listener);
this.client = client;
}
public BaseNettyClientConnectionHandler(WebSocketClientHandshaker handshaker, Client client) {
this(handshaker, client, null);
}
public BaseNettyClientConnectionHandler(WebSocketClientHandshaker handshaker, IBaseConnectionListener<ConnectionHandler> listener) {
super(handshaker, listener);
this.client = null;
}
public BaseNettyClientConnectionHandler(WebSocketClientHandshaker handshaker, long roomId) {
super(handshaker, null);
this.client = null;
}
}

View File

@@ -0,0 +1,58 @@
<!--
~ MIT License
~
~ Copyright (c) 2023 OrdinaryRoad
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xmzs</groupId>
<artifactId>live-chat-client-servers</artifactId>
<version>1.0.0</version>
</parent>
<packaging>jar</packaging>
<artifactId>live-chat-client-servers-netty</artifactId>
<name>live-chat-client-servers-netty</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>com.xmzs</groupId>
<artifactId>live-chat-client-commons-base</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.frame.base;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
/**
* @author mjz
* @date 2023/1/5
*/
public abstract class BaseBinaryWebSocketFrame extends BinaryWebSocketFrame {
public BaseBinaryWebSocketFrame(ByteBuf byteBuf) {
super(byteBuf);
}
}

View File

@@ -0,0 +1,184 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.handler.base;
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseMsgListener;
import tech.ordinaryroad.live.chat.client.commons.base.msg.BaseCmdMsg;
import tech.ordinaryroad.live.chat.client.commons.base.msg.BaseMsg;
import tech.ordinaryroad.live.chat.client.commons.base.msg.ICmdMsg;
import tech.ordinaryroad.live.chat.client.commons.base.msg.IMsg;
import java.util.List;
import java.util.function.Consumer;
/**
* 消息处理器
*
* @author mjz
* @date 2023/1/4
*/
@Slf4j
public abstract class BaseBinaryFrameHandler<
T extends BaseBinaryFrameHandler<?, ?, ?, ?>,
CmdEnum extends Enum<CmdEnum>,
Msg extends IMsg,
MsgListener extends IBaseMsgListener<T, CmdEnum>
> extends SimpleChannelInboundHandler<BinaryWebSocketFrame>
implements IBaseMsgListener<T, CmdEnum> {
@Getter
private final Object roomId;
protected final List<MsgListener> msgListeners;
public BaseBinaryFrameHandler(List<MsgListener> msgListeners, Object roomId) {
this.msgListeners = msgListeners;
this.roomId = roomId;
if (this.msgListeners == null || this.msgListeners.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("listener not set");
}
}
}
/**
* 解码收到的二进制流
*
* @param byteBuf ByteBuf
* @return List<Msg>
*/
protected abstract List<Msg> decode(ByteBuf byteBuf);
@SuppressWarnings("unchecked")
protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame message) {
ByteBuf byteBuf = message.content();
List<Msg> msgList = this.decode(byteBuf);
if (msgList == null || msgList.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("msgList is empty");
}
return;
}
for (Msg msg : msgList) {
this.onMsg((T) BaseBinaryFrameHandler.this, msg);
if (msg instanceof ICmdMsg<?>) {
ICmdMsg<?> cmdMsg = (ICmdMsg<?>) msg;
Enum<?> cmdEnum = cmdMsg.getCmdEnum();
if (cmdEnum == null) {
this.onUnknownCmd((T) BaseBinaryFrameHandler.this, cmdMsg.getCmd(), cmdMsg);
} else {
this.onCmdMsg((T) BaseBinaryFrameHandler.this, (CmdEnum) cmdEnum, (ICmdMsg<CmdEnum>) cmdMsg);
}
}
if (msg instanceof BaseCmdMsg<?>) {
BaseCmdMsg<?> cmdMsg = (BaseCmdMsg<?>) msg;
Enum<?> cmdEnum = cmdMsg.getCmdEnum();
if (cmdEnum == null) {
this.onUnknownCmd((T) BaseBinaryFrameHandler.this, cmdMsg.getCmd(), cmdMsg);
} else {
this.onCmdMsg((T) BaseBinaryFrameHandler.this, (CmdEnum) cmdEnum, (BaseCmdMsg<CmdEnum>) cmdMsg);
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause.getCause() instanceof UnrecognizedPropertyException) {
log.error("缺少字段:{}", cause.getMessage());
} else {
super.exceptionCaught(ctx, cause);
}
}
@Override
public void onMsg(T t, IMsg msg) {
IBaseMsgListener.super.onMsg(t, msg);
iteratorMsgListeners(msgListener -> msgListener.onMsg(t, msg));
}
/**
* 重写该方法判断CMD或者调用{@link IBaseMsgListener#onOtherCmdMsg(Object, Enum, ICmdMsg)}
*
* @param t BaseBinaryFrameHandler
* @param cmd CmdEnum
* @param cmdMsg BaseMsg
*/
@Override
public void onCmdMsg(T t, CmdEnum cmd, ICmdMsg<CmdEnum> cmdMsg) {
IBaseMsgListener.super.onCmdMsg(t, cmd, cmdMsg);
iteratorMsgListeners(msgListener -> msgListener.onCmdMsg(t, cmd, cmdMsg));
}
@Override
public void onUnknownCmd(T t, String cmdString, IMsg msg) {
IBaseMsgListener.super.onUnknownCmd(t, cmdString, msg);
iteratorMsgListeners(msgListener -> msgListener.onUnknownCmd(t, cmdString, msg));
}
@SuppressWarnings("ForLoopReplaceableByForEach")
public void iteratorMsgListeners(Consumer<MsgListener> consumer) {
if (msgListeners.isEmpty()) {
return;
}
for (int i = 0; i < msgListeners.size(); i++) {
consumer.accept(msgListeners.get(i));
}
}
@Override
public void onCmdMsg(T t, CmdEnum cmd, BaseCmdMsg<CmdEnum> cmdMsg) {
IBaseMsgListener.super.onCmdMsg(t, cmd, cmdMsg);
iteratorMsgListeners(msgListener -> msgListener.onCmdMsg(t, cmd, cmdMsg));
}
@Override
public void onUnknownCmd(T t, String cmdString, BaseMsg msg) {
IBaseMsgListener.super.onUnknownCmd(t, cmdString, msg);
iteratorMsgListeners(msgListener -> msgListener.onUnknownCmd(t, cmdString, msg));
}
public String getRoomIdAsString() {
if (this.roomId == null) {
return "";
}
return this.roomId.toString();
}
public long getRoomIdAsLong() {
String roomIdAsString = this.getRoomIdAsString();
if (roomIdAsString.trim().isEmpty()) {
return 0L;
}
return Long.parseLong(roomIdAsString);
}
}

View File

@@ -0,0 +1,168 @@
/*
* MIT License
*
* Copyright (c) 2023 OrdinaryRoad
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package tech.ordinaryroad.live.chat.client.servers.netty.handler.base;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import tech.ordinaryroad.live.chat.client.commons.base.listener.IBaseConnectionListener;
import java.util.concurrent.TimeUnit;
/**
* 连接处理器
*
* @author mjz
* @date 2023/8/21
*/
@Slf4j
public abstract class BaseConnectionHandler<ConnectionHandler extends BaseConnectionHandler<?>> extends SimpleChannelInboundHandler<FullHttpResponse> {
private final WebSocketClientHandshaker handshaker;
@Getter
private ChannelPromise handshakeFuture;
private final IBaseConnectionListener<ConnectionHandler> listener;
/**
* 客户端发送心跳包
*/
private ScheduledFuture<?> scheduledFuture = null;
public BaseConnectionHandler(WebSocketClientHandshaker handshaker, IBaseConnectionListener<ConnectionHandler> listener) {
this.handshaker = handshaker;
this.listener = listener;
}
public BaseConnectionHandler(WebSocketClientHandshaker handshaker) {
this(handshaker, null);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
this.handshaker.handshake(ctx.channel());
}
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) throws Exception {
// 判断是否正确握手
if (this.handshaker.isHandshakeComplete()) {
handshakeSuccessfully(ctx, msg);
} else {
try {
handshakeSuccessfully(ctx, msg);
} catch (WebSocketHandshakeException e) {
handshakeFailed(msg, e);
}
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (log.isDebugEnabled()) {
log.debug("userEventTriggered {}", evt.getClass());
}
if (evt instanceof SslHandshakeCompletionEvent) {
heartbeatCancel();
heartbeatStart(ctx);
if (this.listener != null) {
listener.onConnected((ConnectionHandler) BaseConnectionHandler.this);
}
} else if (evt instanceof SslCloseCompletionEvent) {
heartbeatCancel();
if (this.listener != null) {
listener.onDisconnected((ConnectionHandler) BaseConnectionHandler.this);
}
} else {
log.error("待处理 {}", evt.getClass());
}
super.userEventTriggered(ctx, evt);
}
/**
* 开始发送心跳包
*/
private void heartbeatStart(ChannelHandlerContext ctx) {
scheduledFuture = ctx.executor().scheduleAtFixedRate(() -> {
sendHeartbeat(ctx);
}, getHeartbeatInitialDelay(), getHeartbeatPeriod(), TimeUnit.SECONDS);
}
/**
* 取消发送心跳包
*/
private void heartbeatCancel() {
if (null != scheduledFuture && !scheduledFuture.isCancelled()) {
scheduledFuture.cancel(true);
scheduledFuture = null;
}
}
protected abstract void sendHeartbeat(ChannelHandlerContext ctx);
public abstract void sendAuthRequest(Channel channel);
protected abstract long getHeartbeatPeriod();
protected abstract long getHeartbeatInitialDelay();
private void handshakeSuccessfully(ChannelHandlerContext ctx, FullHttpResponse msg) {
if (log.isDebugEnabled()) {
log.debug("握手完成!");
}
this.handshaker.finishHandshake(ctx.channel(), msg);
this.handshakeFuture.setSuccess();
}
private void handshakeFailed(FullHttpResponse msg, WebSocketHandshakeException e) {
log.error("握手失败status:" + msg.status(), e);
this.handshakeFuture.setFailure(e);
if (listener != null) {
this.listener.onConnectFailed((ConnectionHandler) this);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("exceptionCaught", cause);
if (!this.handshakeFuture.isDone()) {
this.handshakeFuture.setFailure(cause);
}
ctx.close();
}
}

View File

@@ -0,0 +1,46 @@
<!--
~ MIT License
~
~ Copyright (c) 2023 OrdinaryRoad
~
~ Permission is hereby granted, free of charge, to any person obtaining a copy
~ of this software and associated documentation files (the "Software"), to deal
~ in the Software without restriction, including without limitation the rights
~ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
~ copies of the Software, and to permit persons to whom the Software is
~ furnished to do so, subject to the following conditions:
~
~ The above copyright notice and this permission notice shall be included in all
~ copies or substantial portions of the Software.
~
~ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
~ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
~ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
~ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
~ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
~ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
~ SOFTWARE.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.xmzs</groupId>
<artifactId>ruoyi-live</artifactId>
<version>1.0.0</version>
</parent>
<packaging>pom</packaging>
<artifactId>live-chat-client-servers</artifactId>
<name>live-chat-client-servers</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<modules>
<module>live-chat-client-servers-netty</module>
<module>live-chat-client-servers-netty-client</module>
</modules>
</project>