mirror of
https://github.com/youthlql/JavaYouth.git
synced 2026-03-13 21:33:42 +08:00
dubbo源码更新两篇文章-dubbo完结,下次预计更新Spring源码系列文章
This commit is contained in:
54
README.md
54
README.md
@@ -18,39 +18,11 @@
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 目录
|
||||
|
||||
- [Java](#java)
|
||||
|
||||
- [基础](#基础)
|
||||
- [容器](#容器)
|
||||
- [并发](#并发)
|
||||
- [JVM](#JVM)
|
||||
- [各版本新特性](#各版本新特性)
|
||||
|
||||
|
||||
|
||||
- [计算机网络](#计算机网络)
|
||||
|
||||
|
||||
|
||||
- [ElasticSearch](#ElasticSearch)
|
||||
|
||||
# 随笔
|
||||
|
||||
[我的校招-不完全知识点整理](docs/suibi/我的校招-不完全知识点整理.md)
|
||||
|
||||
# Java
|
||||
|
||||
## 基础
|
||||
|
||||
1、总结【TODO】
|
||||
|
||||
|
||||
|
||||
**重难点**
|
||||
### 重难点
|
||||
|
||||
1、[泛型详解【万字长文】](docs/Java/Basis/keyAndDifficultPoints/Generic/泛型.md)
|
||||
|
||||
@@ -58,7 +30,7 @@
|
||||
|
||||
## 容器
|
||||
|
||||
**HashMap**
|
||||
### HashMap
|
||||
|
||||
[HashMap-JDK7源码讲解](docs/Java/collection/HashMap-JDK7源码讲解.md)
|
||||
|
||||
@@ -66,10 +38,6 @@
|
||||
|
||||
|
||||
|
||||
**ConcurrentHashMap源码讲解(JDK7和JDK8)【TODO】**
|
||||
|
||||
|
||||
|
||||
## 并发
|
||||
|
||||
> 这个系列基本全是万字长文,希望读者可以耐心看下去,相信会有很大收获。
|
||||
@@ -90,13 +58,11 @@
|
||||
|
||||
|
||||
|
||||
AQS剩余部分,以及阻塞队列源码暂时先搁置一下。
|
||||
|
||||
|
||||
|
||||
## JVM
|
||||
|
||||
**1、内存与垃圾回收篇**
|
||||
### 内存与垃圾回收篇
|
||||
|
||||
1、[JVM系列-第1章-JVM与Java体系结构](docs/Java/JVM/JVM系列-第1章-JVM与Java体系结构.md)
|
||||
|
||||
@@ -126,7 +92,7 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。
|
||||
|
||||
|
||||
|
||||
## 各版本新特性
|
||||
## JDK新特性
|
||||
|
||||
1、[Java8新特性](docs/Java/Basis/Java8_New_Features/Java8新特性.md)
|
||||
|
||||
@@ -150,7 +116,7 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。
|
||||
|
||||
|
||||
|
||||
# 设计模式【8.2更新基本完毕】
|
||||
# 设计模式
|
||||
|
||||
[1.设计模式-设计思想](docs/design_patterns/design_ideas/设计模式-01.设计思想.md)
|
||||
|
||||
@@ -182,6 +148,10 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。
|
||||
|
||||
[Spring常用注解](docs/spring/use/Spring常用注解.md)
|
||||
|
||||
|
||||
|
||||
## 源码【预计11月或者12月更新】
|
||||
|
||||
# Netty
|
||||
|
||||
## 入门
|
||||
@@ -206,6 +176,12 @@ AQS剩余部分,以及阻塞队列源码暂时先搁置一下。
|
||||
|
||||
4. [Dubbo服务导出源码解析](docs/rpc/dubbo/05.Dubbo源码系列V1-Dubbo第五节-服务导出源码解析.md)
|
||||
|
||||
5. [Dubbo服务引入源码解析](docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md)
|
||||
|
||||
6. [Dubbo服务调用源码解析](docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# Apollo
|
||||
|
||||
1420
docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md
Normal file
1420
docs/rpc/dubbo/06.Dubbo源码系列V1-Dubbo第六节-服务引入源码解析.md
Normal file
File diff suppressed because it is too large
Load Diff
572
docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md
Normal file
572
docs/rpc/dubbo/07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析.md
Normal file
@@ -0,0 +1,572 @@
|
||||
---
|
||||
title: 07.Dubbo源码系列V1-Dubbo第七节-服务调用源码解析
|
||||
tags:
|
||||
- Dubbo
|
||||
- rpc
|
||||
categories:
|
||||
- rpc
|
||||
- Dubbo源码系列v1
|
||||
keywords: Dubbo,rpc
|
||||
description: 服务调用源码解析
|
||||
cover: 'https://cdn.jsdelivr.net/gh/youthlql/youthlql/img/dubbo.png'
|
||||
abbrlink: 84653c9d
|
||||
date: 2021-11-09 14:11:58
|
||||
---
|
||||
|
||||
|
||||
|
||||
## 第七节: Dubbo服务调用源码解析
|
||||
|
||||
|
||||
|
||||
### 服务导出的Netty启动源码
|
||||
|
||||
> 最主要的就是构造一个Handler处理链路
|
||||
|
||||
#### DubboProtocol
|
||||
|
||||
```java
|
||||
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
|
||||
URL url = invoker.getUrl();
|
||||
|
||||
// 唯一标识一个服务的key
|
||||
String key = serviceKey(url);
|
||||
// 构造一个Exporter进行服务导出
|
||||
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
|
||||
exporterMap.put(key, exporter);
|
||||
|
||||
// 省略...
|
||||
|
||||
// 开启NettyServer
|
||||
// 请求--->invocation--->服务key--->exporterMap.get(key)--->exporter--->invoker--->invoker.invoke(invocation)-->执行服务
|
||||
openServer(url);
|
||||
|
||||
return exporter;
|
||||
}
|
||||
|
||||
private void openServer(URL url) {
|
||||
// find server.
|
||||
String key = url.getAddress(); // 获得ip地址和port, 192.168.40.17:20880
|
||||
|
||||
// NettyClient, NettyServer
|
||||
//client can export a service which's only for server to invoke
|
||||
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
|
||||
if (isServer) {
|
||||
// 缓存Server对象
|
||||
ExchangeServer server = serverMap.get(key);
|
||||
|
||||
// DCL,Double Check Lock
|
||||
if (server == null) {
|
||||
synchronized (this) {
|
||||
server = serverMap.get(key);
|
||||
if (server == null) {
|
||||
// 创建Server,并进行缓存
|
||||
serverMap.put(key, createServer(url));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// server supports reset, use together with override
|
||||
// 服务重新导出时,就会走这里
|
||||
server.reset(url);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ExchangeServer createServer(URL url) {
|
||||
url = URLBuilder.from(url)
|
||||
// send readonly event when server closes, it's enabled by default
|
||||
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
|
||||
// enable heartbeat by default
|
||||
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
|
||||
.addParameter(CODEC_KEY, DubboCodec.NAME)
|
||||
.build();
|
||||
|
||||
// 协议的服务器端实现类型,比如:dubbo协议的mina,netty等,http协议的jetty,servlet等,默认为netty
|
||||
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
|
||||
|
||||
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
|
||||
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
|
||||
}
|
||||
|
||||
// 通过url绑定端口,和对应的请求处理器
|
||||
ExchangeServer server;
|
||||
try {
|
||||
// requestHandler是请求处理器,类型为ExchangeHandler
|
||||
// 表示从url的端口接收到请求后,requestHandler来进行处理
|
||||
server = Exchangers.bind(url, requestHandler);
|
||||
} catch (RemotingException e) {
|
||||
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
|
||||
}
|
||||
|
||||
// 协议的客户端实现类型,比如:dubbo协议的mina,netty等
|
||||
str = url.getParameter(CLIENT_KEY);
|
||||
if (str != null && str.length() > 0) {
|
||||
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
|
||||
if (!supportedTypes.contains(str)) {
|
||||
throw new RpcException("Unsupported client type: " + str);
|
||||
}
|
||||
}
|
||||
|
||||
return server;
|
||||
}
|
||||
```
|
||||
|
||||
1. NettyClient<------>Socket连接,数据传输层<------>NettyServer。Netty这两端只要建立了连接就可以互相发送数据。
|
||||
2. ExchangeClient------数据交换层------ExchangeServer。这是Dubbo抽象出来的概念,主要就是抽象出了请求和响应这两个概念。
|
||||
3. ExchangeXXX里面包了Netty的东西
|
||||
|
||||
|
||||
|
||||
```java
|
||||
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
|
||||
|
||||
if (!(message instanceof Invocation)) {
|
||||
throw new RemotingException(channel, "Unsupported request: "
|
||||
+ (message == null ? null : (message.getClass().getName() + ": " + message))
|
||||
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
|
||||
}
|
||||
|
||||
// 转成Invocation对象,要开始用反射执行方法了
|
||||
Invocation inv = (Invocation) message;
|
||||
Invoker<?> invoker = getInvoker(channel, inv); // 服务实现者
|
||||
|
||||
// need to consider backward-compatibility if it's a callback
|
||||
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
|
||||
String methodsStr = invoker.getUrl().getParameters().get("methods");
|
||||
boolean hasMethod = false;
|
||||
if (methodsStr == null || !methodsStr.contains(",")) {
|
||||
hasMethod = inv.getMethodName().equals(methodsStr);
|
||||
} else {
|
||||
String[] methods = methodsStr.split(",");
|
||||
for (String method : methods) {
|
||||
if (inv.getMethodName().equals(method)) {
|
||||
hasMethod = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!hasMethod) {
|
||||
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
|
||||
+ " not found in callback service interface ,invoke will be ignored."
|
||||
+ " please update the api interface. url is:"
|
||||
+ invoker.getUrl()) + " ,invocation is :" + inv);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
// 这里设置了,service中才能拿到remoteAddress
|
||||
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
|
||||
// 执行服务,得到结果
|
||||
Result result = invoker.invoke(inv);
|
||||
// 返回一个CompletableFuture
|
||||
return result.completionFuture().thenApply(Function.identity());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void received(Channel channel, Object message) throws RemotingException {
|
||||
if (message instanceof Invocation) {
|
||||
// 这是服务端接收到Invocation时的处理逻辑
|
||||
reply((ExchangeChannel) channel, message);
|
||||
|
||||
} else {
|
||||
super.received(channel, message);
|
||||
}
|
||||
}
|
||||
|
||||
private void invoke(Channel channel, String methodKey) {
|
||||
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
|
||||
if (invocation != null) {
|
||||
try {
|
||||
received(channel, invocation);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
|
||||
boolean isCallBackServiceInvoke = false;
|
||||
boolean isStubServiceInvoke = false;
|
||||
int port = channel.getLocalAddress().getPort();
|
||||
String path = inv.getAttachments().get(PATH_KEY);
|
||||
|
||||
// if it's callback service on client side
|
||||
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(STUB_EVENT_KEY));
|
||||
if (isStubServiceInvoke) {
|
||||
port = channel.getRemoteAddress().getPort();
|
||||
}
|
||||
|
||||
//callback
|
||||
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
|
||||
if (isCallBackServiceInvoke) {
|
||||
path += "." + inv.getAttachments().get(CALLBACK_SERVICE_KEY);
|
||||
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
|
||||
}
|
||||
|
||||
// 从请求中拿到serviceKey,从exporterMap中拿到已经导出了的服务
|
||||
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
|
||||
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
|
||||
|
||||
if (exporter == null) {
|
||||
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
|
||||
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
|
||||
}
|
||||
|
||||
// 拿到服务对应的Invoker
|
||||
return exporter.getInvoker();
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### Exchangers
|
||||
|
||||
```java
|
||||
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
|
||||
if (url == null) {
|
||||
throw new IllegalArgumentException("url == null");
|
||||
}
|
||||
if (handler == null) {
|
||||
throw new IllegalArgumentException("handler == null");
|
||||
}
|
||||
// codec表示协议编码方式
|
||||
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
|
||||
// 通过url得到HeaderExchanger, 利用HeaderExchanger进行bind,将得到一个HeaderExchangeServer
|
||||
return getExchanger(url).bind(url, handler);
|
||||
}
|
||||
```
|
||||
|
||||
#### HeaderExchange
|
||||
|
||||
```java
|
||||
@Override
|
||||
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
|
||||
|
||||
// 下面会去启动Netty
|
||||
// 对handler包装了两层,表示当处理一个请求时,每层Handler负责不同的处理逻辑
|
||||
// 为什么在connect和bind时都是DecodeHandler,解码,解的是把InputStream解析成RpcInvocation对象
|
||||
// DecodeHandler -> HeaderExchangeHandler -> DubboProtocol(ExchangeHandlerAdapter) 一层一层包
|
||||
// 上面的handler处理完了交给下面的handler
|
||||
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
|
||||
}
|
||||
```
|
||||
|
||||
#### HeaderExchangeServer
|
||||
|
||||
HeaderExchangeServer里有一个server属性,这个server就是NettyServer
|
||||
|
||||
```java
|
||||
private final Server server;
|
||||
//启动netty的时候会调用这个
|
||||
public HeaderExchangeServer(Server server) {
|
||||
Assert.notNull(server, "server == null");
|
||||
this.server = server;
|
||||
// 启动定义关闭Channel(socket)的Task
|
||||
startIdleCheckTask(getUrl());
|
||||
}
|
||||
```
|
||||
|
||||
#### Transporters
|
||||
|
||||
```java
|
||||
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
|
||||
if (url == null) {
|
||||
throw new IllegalArgumentException("url == null");
|
||||
}
|
||||
if (handlers == null || handlers.length == 0) {
|
||||
throw new IllegalArgumentException("handlers == null");
|
||||
}
|
||||
|
||||
// 如果bind了多个handler,那么当有一个连接过来时,会循环每个handler去处理连接
|
||||
ChannelHandler handler;
|
||||
if (handlers.length == 1) {
|
||||
handler = handlers[0];
|
||||
} else {
|
||||
handler = new ChannelHandlerDispatcher(handlers);
|
||||
}
|
||||
|
||||
// 调用NettyTransporter去绑定,Transporter表示网络传输层
|
||||
return getTransporter().bind(url, handler);
|
||||
}
|
||||
|
||||
public static Transporter getTransporter() {
|
||||
//@SPI默认配置的就是netty
|
||||
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
|
||||
}
|
||||
```
|
||||
|
||||
#### NettyTransporter
|
||||
|
||||
```java
|
||||
@Override
|
||||
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
|
||||
return new NettyServer(url, listener);
|
||||
}
|
||||
```
|
||||
|
||||
#### NettyServer
|
||||
|
||||
```java
|
||||
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
|
||||
//多个handler一层一层的包装,有点像责任链模式,这个handler处理完了,交给下一个handler
|
||||
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
|
||||
}
|
||||
```
|
||||
|
||||
#### ChannelHandlers
|
||||
|
||||
```java
|
||||
public class ChannelHandlers {
|
||||
// 单例模式
|
||||
private static ChannelHandlers INSTANCE = new ChannelHandlers();
|
||||
|
||||
protected ChannelHandlers() {
|
||||
}
|
||||
|
||||
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
|
||||
return ChannelHandlers.getInstance().wrapInternal(handler, url);
|
||||
}
|
||||
|
||||
protected static ChannelHandlers getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
static void setTestingChannelHandlers(ChannelHandlers instance) {
|
||||
INSTANCE = instance;
|
||||
}
|
||||
|
||||
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
|
||||
// 先通过ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url)
|
||||
// 得到一个AllChannelHandler(handler, url)
|
||||
// 然后把AllChannelHandler包装成HeartbeatHandler,HeartbeatHandler包装成MultiMessageHandler
|
||||
// 所以当Netty接收到一个数据时,会经历MultiMessageHandler--->HeartbeatHandler---->AllChannelHandler
|
||||
// 而AllChannelHandler会调用handler
|
||||
return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
|
||||
.getAdaptiveExtension().dispatch(handler, url)));
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
然后回到NettyServer调用super(XXX),走到AbstractServer
|
||||
|
||||
#### AbstractServer
|
||||
|
||||
```java
|
||||
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
|
||||
super(url, handler);
|
||||
localAddress = getUrl().toInetSocketAddress();
|
||||
|
||||
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
|
||||
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
|
||||
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
|
||||
bindIp = ANYHOST_VALUE;
|
||||
}
|
||||
bindAddress = new InetSocketAddress(bindIp, bindPort);
|
||||
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
|
||||
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
|
||||
try {
|
||||
doOpen();//走到NettyServer
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
|
||||
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
|
||||
}
|
||||
//fixme replace this with better method
|
||||
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
|
||||
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
|
||||
}
|
||||
```
|
||||
|
||||
#### NettyServer
|
||||
|
||||
```java
|
||||
protected void doOpen() throws Throwable {
|
||||
bootstrap = new ServerBootstrap();
|
||||
|
||||
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
|
||||
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
|
||||
new DefaultThreadFactory("NettyServerWorker", true));
|
||||
|
||||
//最终再包装一个NettyServerHandler,这个就是最外层的Handler,请求来了它是第一个处理的
|
||||
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
|
||||
channels = nettyServerHandler.getChannels();
|
||||
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
|
||||
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
|
||||
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
||||
.childHandler(new ChannelInitializer<NioSocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws Exception {
|
||||
// FIXME: should we use getTimeout()?
|
||||
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
|
||||
// 这里就会拿到DubboCodec,接收到数据之后就会进行解码
|
||||
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
|
||||
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
|
||||
.addLast("decoder", adapter.getDecoder())
|
||||
.addLast("encoder", adapter.getEncoder())
|
||||
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
|
||||
.addLast("handler", nettyServerHandler);
|
||||
}
|
||||
});
|
||||
// bind
|
||||
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
|
||||
channelFuture.syncUninterruptibly();
|
||||
channel = channelFuture.channel();
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
#### 分割
|
||||
|
||||
### 服务提供端执行逻辑
|
||||
|
||||
#### 概述
|
||||
|
||||
1. NettyServerHandler:接收数据
|
||||
2. MultiMessageHandler:判断接收到的数据是否是MultiMessage,如果是则获取MultiMessage中的单个Message,传递给HeartbeatHandler进行处理
|
||||
3. HeartbeatHandler:判断是不是心跳消息,如果是不是则把Message传递给AllChannelHandler
|
||||
4. AllChannelHandler:把接收到的Message封装为一个ChannelEventRunnable对象,扔给线程池进行处理
|
||||
5. ChannelEventRunnable:在ChannelEventRunnable的run方法中会调用DecodeHandler处理Message
|
||||
6. DecodeHandler:按Dubbo协议的数据格式,解析当前请求的path,versio,方法,方法参数等等,然后把解析好了的请求交给HeaderExchangeHandler
|
||||
7. HeaderExchangeHandler:处理Request数据,首先构造一个Response对象,然后调用ExchangeHandlerAdapter得到一个CompletionStage future,然后给future通过whenComplete绑定一个回调函数,当future执行完了之后,就可以从回调函数中得到ExchangeHandlerAdapter的执行结果,并把执行结果设置给Response对象,通过channel发送出去。
|
||||
8. ExchangeHandlerAdapter:从本机已经导出的Exporter中根据当前Request所对应的服务key,去寻找Exporter对象,从Exporter中得到Invoker,然后执行invoke方法,此Invoker为ProtocolFilterWrapper$CallbackRegistrationInvoker
|
||||
9. ProtocolFilterWrapper$CallbackRegistrationInvoker:负责执行过滤器链,并且在执行完了之后回调每个过滤器的onResponse或onError方法
|
||||
10. EchoFilter:判断当前请求是不是一个回升测试,如果是,则不继续执行过滤器链了(服务实现者Invoker也不会调用了)
|
||||
11. ClassLoaderFilter:设置当前线程的classloader为当前要执行的服务接口所对应的classloader
|
||||
12. GenericFilter:把泛化调用发送过来的信息包装为RpcInvocation对象
|
||||
13. ContextFilter:设置RpcContext.getContext()的参数
|
||||
14. TraceFilter:先执行下一个invoker的invoke方法,调用成功后录调用信息
|
||||
15. TimeoutFilter:调用时没有特别处理,只是记录了一下当前时间,当整个filter链都执行完了之后回调TimeoutFilter的onResponse方法时,会判断本次调用是否超过了timeout
|
||||
16. MonitorFilter:记录当前服务的执行次数
|
||||
17. ExceptionFilter:调用时没有特别处理,在回调onResponse方法时,对不同的异常进行处理,详解Dubbo的异常处理
|
||||
18. DelegateProviderMetaDataInvoker:过滤器链结束,调用下一个Invoker
|
||||
19. AbstractProxyInvoker:在服务导出时,根据服务接口,服务实现类对象生成的,它的invoke方法就会执行服务实现类对象的方法,得到结果
|
||||
|
||||
#### JavassistProxyFactory
|
||||
|
||||
```java
|
||||
public class JavassistProxyFactory extends AbstractProxyFactory {
|
||||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
|
||||
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
|
||||
|
||||
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
|
||||
// 如果现在被代理的对象proxy本身就是一个已经被代理过的对象,那么则取代理类的Wrapper,否则取type(接口)的Wrapper
|
||||
// Wrapper是针对某个类或某个接口的包装类,通过wrapper对象可以更方便的去执行某个类或某个接口的方法
|
||||
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
|
||||
|
||||
// proxy是服务实现类
|
||||
// type是服务接口
|
||||
// url是一个注册中心url,但同时也记录了
|
||||
return new AbstractProxyInvoker<T>(proxy, type, url) {
|
||||
@Override
|
||||
protected Object doInvoke(T proxy, String methodName,
|
||||
Class<?>[] parameterTypes,
|
||||
Object[] arguments) throws Throwable {
|
||||
|
||||
// 执行proxy的method方法
|
||||
// 执行的proxy实例的方法
|
||||
// 如果没有wrapper,则要通过原生的反射技术去获取Method对象,然后执行
|
||||
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
### 服务消费端执行逻辑
|
||||
|
||||
|
||||
|
||||
1. MockClusterInvoker.invoke(new RpcInvocation(method, args)):Mock逻辑
|
||||
2. AbstractClusterInvoker.invoke(invocation):把RpcContext中设置的Attachments添加到invocation对象上,调用路由链从服务目录上筛选出适合的服务Invoker,获得服务均衡策略loadbalance
|
||||
3. FailoverClusterInvoker.doInvoke(invocation, invokers, loadbalance):根据负载均衡策略选出一个invoker,然后执行
|
||||
4. InvokerWrapper.invoke(invocation):没做什么事情
|
||||
5. CallbackRegistrationInvoker.invoke(invocation):开始执行Filter链,执行完得到结果后,会获取ListenableFilter中的listener,执行listener的onResponse方法
|
||||
6. ConsumerContextFilter.invoke(invocation):设置RpcContext中LocalAddress、RemoteAddress、RemoteApplicationName参数
|
||||
7. FutureFilter.invoke(invocation):
|
||||
8. MonitorFilter.invoke(invocation):方法的执行次数+1
|
||||
9. ListenerInvokerWrapper.invoke(invocation):没做什么事情
|
||||
10. AsyncToSyncInvoker.invoke(invocation):异步转同步,会先用下层Invoker去异步执行,然后阻塞Integer.MAX_VALUE时间,直到拿到了结果
|
||||
11. AbstractInvoker.invoke(invocation):主要调用DubboInvoker的doInvoke方法,如果doInvoker方法出现了异常,会进行包装,包装成AsyncRpcResult
|
||||
12. DubboInvoker.doInvoke(invocation):从clients轮询出一个client进行数据发送,如果配置了不关心结果,则调用ReferenceCountExchangeClient的send方法,否则调用ReferenceCountExchangeClient的request方法
|
||||
13. ReferenceCountExchangeClient.request(Object request, int timeout):没做什么事情
|
||||
14. HeaderExchangeClient.request(Object request, int timeout):没做什么事情
|
||||
15. HeaderExchangeChannel.request(Object request, int timeout):构造一个Request对象,并且会构造一个DefaultFuture对象来阻塞timeout的时间来等待结果,在构造DefaultFuture对象时,会把DefaultFuture对象和req的id存入FUTURES中,FUTURES是一个Map,当HeaderExchangeHandler接收到结果时,会从这个Map中根据id获取到DefaultFuture对象,然后返回Response。
|
||||
16. AbstractPeer.send(Object message):从url中获取send参数,默认为false
|
||||
17. AbstractClient.send(Object message, boolean sent):没做什么
|
||||
18. NettyChannel.send(Object message, boolean sent):调用NioSocketChannel的writeAndFlush发送数据,然后判断send如果是true,那么则阻塞url中指定的timeout时间,因为如果send是false,在HeaderExchangeChannel中会阻塞timeout时间
|
||||
19. NioSocketChannel.writeAndFlush(Object msg):最底层的Netty非阻塞式的发送数据
|
||||
|
||||
|
||||
|
||||
总结一下上面调用流程:
|
||||
|
||||
1. 最外层是Mock逻辑,调用前,调用后进行Mock
|
||||
2. 从服务目录中,根据当前调用的方法和路由链,筛选出部分服务Invoker(DubboInvoker)
|
||||
3. 对服务Invoker进行负载均衡,选出一个服务Invoker
|
||||
4. 执行Filter链
|
||||
5. AsyncToSyncInvoker完成异步转同步,因为DubboInvoker的执行是异步非阻塞的,所以如果是同步调用,则会在此处阻塞,知道拿到响应结果
|
||||
6. DubboInvoker开始异步非阻塞的调用
|
||||
7. HeaderExchangeChannel中会阻塞timeout的时间来等待结果,该timeout就是用户在消费端所配置的timeout
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
### Dubbo的异常处理
|
||||
|
||||
当服务消费者在调用一个服务时,服务提供者在执行服务逻辑时可能会出现异常,对于Dubbo来说,服务消费者需要在消费端抛出这个异常,那么这个功能是怎么做到的呢?
|
||||
|
||||
|
||||
|
||||
服务提供者在执行服务时,如果出现了异常,那么框架会把异常捕获,捕获异常的逻辑在AbstractProxyInvoker中,捕获到异常后,会把异常信息包装为正常的AppResponse对象,只是AppResponse的value属性没有值,exception属性有值。
|
||||
|
||||
|
||||
|
||||
此后,服务提供者会把这个AppResponse对象发送给服务消费端,服务消费端是在InvokerInvocationHandler中调用AppResponse的recreate方法重新得到一个结果,在recreate方法中会去失败AppResponse对象是否正常,也就是是否存在exception信息,如果存在,则直接throw这个exception,从而做到**服务执行时出现的异常,在服务消费端抛出**。
|
||||
|
||||
|
||||
|
||||
那么这里存在一个问题,如果服务提供者抛出的异常类,在服务消费者这边不存在,那么服务消费者也就抛不出这个异常了,那么dubbo是怎么处理的呢?
|
||||
|
||||
|
||||
|
||||
这里就涉及到了ExceptionFilter,它是服务提供者端的一个过滤器,它主要是在服务提供者执行完服务后会去识别异常:
|
||||
|
||||
1. 如果是需要开发人员捕获的异常,那么忽略,直接把这个异常返回给消费者
|
||||
2. 如果在当前所执行的方法签名上有声明,那么忽略,直接把这个异常返回给消费者
|
||||
3. 如果抛出的异常不需要开发人员捕获,或者方法上没有申明,那么服务端或记录一个error日志
|
||||
4. 异常类和接口类在同一jar包里,那么忽略,直接把这个异常返回给消费者
|
||||
5. 如果异常类是JDK自带的异常,那么忽略,直接把这个异常返回给消费者
|
||||
6. 如果异常类是Dubbo自带的异常,那么忽略,直接把这个异常返回给消费者
|
||||
7. **否则,把异常信息包装成RuntimeException,并覆盖AppResponse对象中的exception属性**
|
||||
Reference in New Issue
Block a user